tsdbRead.c 143.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17

dengyihao's avatar
dengyihao 已提交
18 19
#define EXTRA_BYTES                2
#define ASCENDING_TRAVERSE(o)      (o == TSDB_ORDER_ASC)
20
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
21

H
Haojun Liao 已提交
22 23 24 25
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
26
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
27

H
hjxilinx 已提交
28
enum {
dengyihao's avatar
dengyihao 已提交
29 30
  TSDB_QUERY_TYPE_ALL = 1,
  TSDB_QUERY_TYPE_LAST = 2,
H
hjxilinx 已提交
31 32
};

33
enum {
dengyihao's avatar
dengyihao 已提交
34
  TSDB_CACHED_TYPE_NONE = 0,
35
  TSDB_CACHED_TYPE_LASTROW = 1,
dengyihao's avatar
dengyihao 已提交
36
  TSDB_CACHED_TYPE_LAST = 2,
37 38
};

39
typedef struct SQueryFilePos {
dengyihao's avatar
dengyihao 已提交
40 41 42 43 44 45 46
  int32_t     fid;
  int32_t     slot;
  int32_t     pos;
  int64_t     lastKey;
  int32_t     rows;
  bool        mixBlock;
  bool        blockCompleted;
47
  STimeWindow win;
48
} SQueryFilePos;
H
hjxilinx 已提交
49

50
typedef struct SDataBlockLoadInfo {
dengyihao's avatar
dengyihao 已提交
51 52 53 54
  SDFileSet* fileGroup;
  int32_t    slot;
  uint64_t   uid;
  SArray*    pLoadedCols;
55
} SDataBlockLoadInfo;
H
hjxilinx 已提交
56

57
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
58
  int32_t tid; /* table tid */
59 60
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
61

62
enum {
dengyihao's avatar
dengyihao 已提交
63
  CHECKINFO_CHOSEN_MEM = 0,
64
  CHECKINFO_CHOSEN_IMEM = 1,
dengyihao's avatar
dengyihao 已提交
65
  CHECKINFO_CHOSEN_BOTH = 2  // for update=2(merge case)
66 67
};

68
typedef struct STableCheckInfo {
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77
  uint64_t           tableId;
  TSKEY              lastKey;
  SBlockInfo*        pCompInfo;
  int32_t            compSize;
  int32_t            numOfBlocks : 29;  // number of qualified data blocks not the original blocks
  uint8_t            chosen : 2;        // indicate which iterator should move forward
  bool               initBuf : 1;       // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;              // mem buffer skip list iterator
  SSkipListIterator* iiter;             // imem buffer skip list iterator
78
} STableCheckInfo;
79

80
typedef struct STableBlockInfo {
dengyihao's avatar
dengyihao 已提交
81 82
  SBlock*          compBlock;
  STableCheckInfo* pTableCheckInfo;
83
} STableBlockInfo;
84

85
typedef struct SBlockOrderSupporter {
dengyihao's avatar
dengyihao 已提交
86 87 88 89
  int32_t           numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*          blockIndexArray;
  int32_t*          numOfBlocksPerTable;
90 91
} SBlockOrderSupporter;

H
Haojun Liao 已提交
92 93 94
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
95
  int64_t checkForNextTime;
96 97
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
98 99
} SIOCostSummary;

100
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
101 102 103 104
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
  SArray*          defaultLoadColumn;  // default load column
  int32_t*         slotIds;            // colId to slotId
105 106
} SBlockLoadSuppInfo;

107
typedef struct STsdbReadHandle {
C
Cary Xu 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  STsdb*        pTsdb;
  SQueryFilePos cur;  // current position
  int16_t       order;
  STimeWindow   window;  // the primary query time window that applies to all queries
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
  int32_t numOfBlocks;
  SArray* pColumns;  // column list, SColumnInfoData array list
  bool    locateStart;
  int32_t outputCapacity;
  int32_t realNumOfRows;
  SArray* pTableCheckInfo;  // SArray<STableCheckInfo>
  int32_t activeIndex;
  bool    checkFiles;               // check file stage
  int8_t  cachelastrow;             // check if last row cached
  bool    loadExternalRow;          // load time window external data rows
  bool    currentLoadExternalRows;  // current load external rows
  int32_t loadType;                 // block load type
  char*   idStr;                    // query info handle, for debug purpose
dengyihao's avatar
dengyihao 已提交
127 128 129 130 131
  int32_t type;  // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
  SDFileSet*         pFileGroup;
  SFSIter            fileIter;
  SReadH             rhelper;
  STableBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
132 133 134 135
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
136
  SBlockLoadSuppInfo suppInfo;
C
Cary Xu 已提交
137 138 139 140
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
  SIOCostSummary     cost;
  STSchema*          pSchema;
141
} STsdbReadHandle;
142

H
Haojun Liao 已提交
143 144 145
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
146
  SSchema*   pTagSchema;
H
Haojun Liao 已提交
147 148
} STableGroupSupporter;

dengyihao's avatar
dengyihao 已提交
149 150 151 152 153
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo);

static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList);
static int32_t     checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList);
static int32_t     checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
154
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
157
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
dengyihao's avatar
dengyihao 已提交
158 159
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
                                     STsdbReadHandle* pTsdbReadHandle);
160
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
dengyihao's avatar
dengyihao 已提交
161 162 163 164
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
// static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
165

C
Cary Xu 已提交
166 167
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions);

168
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
169
  pBlockLoadInfo->slot = -1;
dengyihao's avatar
dengyihao 已提交
170
  pBlockLoadInfo->uid = 0;
H
hjxilinx 已提交
171
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
172 173
}

174
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
175
  pCompBlockLoadInfo->tid = -1;
176 177
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
178

179 180
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
181 182 183 184
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
185
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
186 187 188 189 190 191
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

192 193
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
194 195 196 197 198

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
199 200
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
201 202 203 204 205 206
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
207
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
dengyihao's avatar
dengyihao 已提交
208
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
209

dengyihao's avatar
dengyihao 已提交
210 211 212 213 214
  int64_t        rows = 0;
  STsdbMemTable* pMemTable = NULL;  // pTsdbReadHandle->pMemTable;
  if (pMemTable == NULL) {
    return rows;
  }
H
Haojun Liao 已提交
215

dengyihao's avatar
dengyihao 已提交
216 217
  //  STableData* pMem  = NULL;
  //  STableData* pIMem = NULL;
H
Haojun Liao 已提交
218

dengyihao's avatar
dengyihao 已提交
219 220
  //  SMemTable* pMemT = pMemRef->snapshot.mem;
  //  SMemTable* pIMemT = pMemRef->snapshot.imem;
H
Haojun Liao 已提交
221 222 223 224 225

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

dengyihao's avatar
dengyihao 已提交
226 227 228 229 230 231 232 233
    //    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
    //      pMem = pMemT->tData[pCheckInfo->tableId];
    //      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
    //    }
    //    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
    //      pIMem = pIMemT->tData[pCheckInfo->tableId];
    //      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
    //    }
H
Haojun Liao 已提交
234 235 236
  }
  return rows;
}
237

238 239 240
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
  assert(numOfGroup >= 1);
H
Haojun Liao 已提交
241 242 243 244 245 246 247 248

  // allocate buffer in order to load data blocks from file
  SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
249
  for (int32_t i = 0; i < numOfGroup; ++i) {
dengyihao's avatar
dengyihao 已提交
250
    SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i);
H
Haojun Liao 已提交
251 252 253 254 255

    size_t gsize = taosArrayGetSize(group);
    assert(gsize > 0);

    for (int32_t j = 0; j < gsize; ++j) {
dengyihao's avatar
dengyihao 已提交
256
      STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j);
H
Haojun Liao 已提交
257

dengyihao's avatar
dengyihao 已提交
258
      STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
259 260 261
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
        if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
          info.lastKey = pTsdbReadHandle->window.skey;
262 263
        }

264
        assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
265
      } else {
266
        info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
267 268 269
      }

      taosArrayPush(pTableCheckInfo, &info);
dengyihao's avatar
dengyihao 已提交
270 271
      tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
                info.lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
272 273 274
    }
  }

275
  // TODO  group table according to the tag value.
276
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
277 278 279
  return pTableCheckInfo;
}

280 281
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
282 283 284 285
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
dengyihao's avatar
dengyihao 已提交
286
    STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
287
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
dengyihao's avatar
dengyihao 已提交
288 289
    pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
290
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
291

292 293
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
294
    } else {
295
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
296
    }
H
Haojun Liao 已提交
297 298 299
  }
}

H
Haojun Liao 已提交
300 301 302
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
303

dengyihao's avatar
dengyihao 已提交
304
  STableCheckInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
305

H
Haojun Liao 已提交
306 307
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
308 309 310
  return pNew;
}

311 312
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
313

314
  STimeWindow* w = &pTsdbReadHandle->window;
dengyihao's avatar
dengyihao 已提交
315
  bool         asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
316 317 318 319

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

320 321
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
322
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
H
Hongze Cheng 已提交
323
  STsdbCfg* pCfg = REPO_CFG(pTsdb);
324 325

  int64_t now = taosGetTimestamp(pCfg->precision);
H
Hongze Cheng 已提交
326
  return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
327 328
}

329
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) {
330
  pTsdbReadHandle->window = pCond->twindow;
331

332
  bool    updateTs = false;
333 334 335 336
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
337 338
      pCond->twindow.skey = startTs;
      updateTs = true;
339 340
    }
  } else {
341 342
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
343 344
      pCond->twindow.ekey = startTs;
      updateTs = true;
345 346 347
    }
  }

348
  if (updateTs) {
H
Haojun Liao 已提交
349 350 351
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
              pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
              pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
352
  }
353
}
C
Cary Xu 已提交
354 355 356
#if 0
int           nQUERY = 0;
#endif
C
Cary Xu 已提交
357 358
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
  if (vnodeIsRollup(pVnode)) {
C
Cary Xu 已提交
359
    int level = 0;
C
Cary Xu 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
#if 1
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
      SRetention* pRetention = retentions + i;
      if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) {
        break;
      }
    }
#endif
#if 0
    ++nQUERY;
    if(nQUERY%3 == 0) {
      level = 2;
    } else if(nQUERY%2 == 0) {
      level = 1;
    } else {
      level = 0;
    }
#endif
    if (level == TSDB_RETENTION_L0) {
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
      return VND_RSMA1(pVnode);
    } else {
      return VND_RSMA2(pVnode);
    }
C
Cary Xu 已提交
386 387 388 389
  }
  return pVnode->pTsdb;
}

390
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
391
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
392
  if (pReadHandle == NULL) {
393
    goto _end;
394
  }
H
Haojun Liao 已提交
395

C
Cary Xu 已提交
396 397
  STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);

dengyihao's avatar
dengyihao 已提交
398
  pReadHandle->order = pCond->order;
C
Cary Xu 已提交
399
  pReadHandle->pTsdb = pTsdb;
dengyihao's avatar
dengyihao 已提交
400 401 402 403 404 405
  pReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid = INT32_MIN;
  pReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles = true;
  pReadHandle->activeIndex = 0;  // current active table index
  pReadHandle->allocSize = 0;
406
  pReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
407
  pReadHandle->loadType = pCond->type;
408

dengyihao's avatar
dengyihao 已提交
409
  pReadHandle->outputCapacity = 4096;  //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
410 411 412
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
413
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
414
  snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
H
Haojun Liao 已提交
415 416
  pReadHandle->idStr = strdup(buf);

C
Cary Xu 已提交
417
  if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
418
    goto _end;
B
Bomin Zhang 已提交
419
  }
H
Haojun Liao 已提交
420

421 422
  assert(pCond != NULL);
  setQueryTimewindow(pReadHandle, pCond);
423

424 425
  if (pCond->numOfCols > 0) {
    // allocate buffer in order to load data blocks from file
426 427
    pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
    if (pReadHandle->suppInfo.pstatis == NULL) {
428
      goto _end;
429
    }
H
Haojun Liao 已提交
430

431
    // todo: use list instead of array?
432 433
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
434
      goto _end;
435
    }
H
Haojun Liao 已提交
436

437 438 439
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
440

441
      int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
442
      if (code != TSDB_CODE_SUCCESS) {
443
        goto _end;
444
      }
445

446
      taosArrayPush(pReadHandle->pColumns, &colInfo);
B
Bomin Zhang 已提交
447
    }
H
Haojun Liao 已提交
448

449
    pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
C
Cary Xu 已提交
450 451 452 453
    pReadHandle->suppInfo.slotIds =
        taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
    pReadHandle->suppInfo.plist =
        taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
H
Haojun Liao 已提交
454
  }
455

C
Cary Xu 已提交
456
  pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
457
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
458
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
459
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
460
    goto _end;
H
hjxilinx 已提交
461
  }
462

463 464
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
465

H
Haojun Liao 已提交
466
  return (tsdbReaderT)pReadHandle;
467

dengyihao's avatar
dengyihao 已提交
468
_end:
469
  tsdbCleanupReadHandle(pReadHandle);
470
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
471
  return NULL;
H
hjxilinx 已提交
472 473
}

474
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
475
                             uint64_t taskId) {
476
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
477
  if (pTsdbReadHandle == NULL) {
478 479 480
    return NULL;
  }

481
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
482
    return (tsdbReaderT*)pTsdbReadHandle;
483
  }
H
Haojun Liao 已提交
484 485

  // todo apply the lastkey of table check to avoid to load header file
486
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
487
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
488
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
489 490 491 492
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

C
Cary Xu 已提交
493
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
494

495
  pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
C
Cary Xu 已提交
496
  int32_t  numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
497 498 499 500 501
  int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;

  STSchema* pSchema = pTsdbReadHandle->pSchema;

  int32_t i = 0, j = 0;
C
Cary Xu 已提交
502
  while (i < numOfCols && j < pSchema->numOfCols) {
503 504 505 506 507 508 509 510 511 512 513 514
    if (ids[i] == pSchema->columns[j].colId) {
      pTsdbReadHandle->suppInfo.slotIds[i] = j;
      i++;
      j++;
    } else if (ids[i] > pSchema->columns[j].colId) {
      j++;
    } else {
      //    tsdbCleanupReadHandle(pTsdbReadHandle);
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
    }
  }
515

dengyihao's avatar
dengyihao 已提交
516 517 518
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
            taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
            pTsdbReadHandle->idStr);
519

dengyihao's avatar
dengyihao 已提交
520
  return (tsdbReaderT)pTsdbReadHandle;
H
Haojun Liao 已提交
521 522
}

523
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
524
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
525

526 527 528
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
wafwerar's avatar
wafwerar 已提交
529
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
530 531 532 533 534
    }

    return;
  }

dengyihao's avatar
dengyihao 已提交
535 536 537 538 539 540 541
  pTsdbReadHandle->order = pCond->order;
  pTsdbReadHandle->window = pCond->twindow;
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
542 543
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
544 545

  if (ASCENDING_TRAVERSE(pCond->order)) {
546
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
547
  } else {
548
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
549 550 551
  }

  // allocate buffer in order to load data blocks from file
552 553
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
554

555 556
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
557

558
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
559 560
}

561
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) {
562
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
563

dengyihao's avatar
dengyihao 已提交
564 565 566 567 568 569 570
  pTsdbReadHandle->order = pCond->order;
  pTsdbReadHandle->window = pCond->twindow;
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
571 572
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
573 574

  if (ASCENDING_TRAVERSE(pCond->order)) {
575
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
576
  } else {
577
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
578 579 580
  }

  // allocate buffer in order to load data blocks from file
581 582
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
583

584 585
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
586

H
Haojun Liao 已提交
587
  SArray* pTable = NULL;
dengyihao's avatar
dengyihao 已提交
588
  //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
589

dengyihao's avatar
dengyihao 已提交
590
  //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
591

dengyihao's avatar
dengyihao 已提交
592 593
  pTsdbReadHandle->pTableCheckInfo = NULL;  // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
                                            // &pTable);
594
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
595
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
596 597
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
598

dengyihao's avatar
dengyihao 已提交
599 600
  //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
601 602
}

603
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
604
                             uint64_t taskId) {
605
  pCond->twindow = updateLastrowForEachGroup(groupList);
H
Haojun Liao 已提交
606 607 608 609 610 611

  // no qualified table
  if (groupList->numOfTables == 0) {
    return NULL;
  }

612
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, groupList, qId, taskId);
613
  if (pTsdbReadHandle == NULL) {
614 615 616
    return NULL;
  }

617
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList);
dengyihao's avatar
dengyihao 已提交
618
  if (code != TSDB_CODE_SUCCESS) {  // set the numOfTables to be 0
H
Haojun Liao 已提交
619 620 621
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
622 623

  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
624 625
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
626
  }
dengyihao's avatar
dengyihao 已提交
627

628
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
629 630
}

631
#if 0
632 633
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
634
  if (pTsdbReadHandle == NULL) {
635 636 637
    return NULL;
  }

638
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
639 640 641 642 643
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

644 645
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
646
  }
D
init  
dapan1121 已提交
647
  
648
  return pTsdbReadHandle;
H
hjxilinx 已提交
649 650
}

651
#endif
dengyihao's avatar
dengyihao 已提交
652
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
653
  assert(pHandle != NULL);
H
Haojun Liao 已提交
654

dengyihao's avatar
dengyihao 已提交
655
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
656

dengyihao's avatar
dengyihao 已提交
657
  size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
658
  SArray* res = taosArrayInit(size, POINTER_BYTES);
659 660 661
  return res;
}

H
Haojun Liao 已提交
662 663 664 665 666
// leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
  assert(pGroupList);
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

wafwerar's avatar
wafwerar 已提交
667
  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
Y
yihaoDeng 已提交
668
  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
H
Haojun Liao 已提交
669

dengyihao's avatar
dengyihao 已提交
670
  for (int32_t i = 0; i < numOfGroup; ++i) {
H
Haojun Liao 已提交
671
    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
dengyihao's avatar
dengyihao 已提交
672
    size_t  numOfTables = taosArrayGetSize(oneGroup);
H
Haojun Liao 已提交
673 674 675 676

    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
    for (int32_t j = 0; j < numOfTables; ++j) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
dengyihao's avatar
dengyihao 已提交
677 678 679 680 681
      //      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
      //        taosArrayPush(px, pInfo);
      //        pNew->numOfTables += 1;
      //        break;
      //      }
H
Haojun Liao 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694
    }

    // there are no data in this group
    if (taosArrayGetSize(px) == 0) {
      taosArrayDestroy(px);
    } else {
      taosArrayPush(pNew->pGroupList, &px);
    }
  }

  return pNew;
}

695
tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
696
                                          uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
697 698
  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);

699 700 701 702 703 704 705 706 707 708 709 710
  if (pNew->numOfTables == 0) {
    tsdbDebug("update query time range to invalidate time window");

    assert(taosArrayGetSize(pNew->pGroupList) == 0);
    bool asc = ASCENDING_TRAVERSE(pCond->order);
    if (asc) {
      pCond->twindow.ekey = pCond->twindow.skey - 1;
    } else {
      pCond->twindow.skey = pCond->twindow.ekey - 1;
    }
  }

711
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
712 713
  pTsdbReadHandle->loadExternalRow = true;
  pTsdbReadHandle->currentLoadExternalRows = true;
714

715
  return pTsdbReadHandle;
716 717
}

718
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
719
  if (pCheckInfo->initBuf) {
720 721
    return true;
  }
H
Haojun Liao 已提交
722

723
  pCheckInfo->initBuf = true;
724
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
725

726 727 728
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

729
  TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
730 731 732
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
733
      pCheckInfo->iter =
734
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
735
    }
736
  }
H
Haojun Liao 已提交
737

738 739 740
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
741
      pCheckInfo->iiter =
742
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
743
    }
744
  }
H
Haojun Liao 已提交
745

746 747 748 749
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
750

dengyihao's avatar
dengyihao 已提交
751
  bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
752
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
dengyihao's avatar
dengyihao 已提交
753
  if (memEmpty && imemEmpty) {  // buffer is empty
754 755
    return false;
  }
H
Haojun Liao 已提交
756

757 758 759
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
760

H
Haojun Liao 已提交
761 762
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
763
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
764 765 766
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey,
              (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
767 768 769 770 771 772 773

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

774
  } else {
dengyihao's avatar
dengyihao 已提交
775
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
776
  }
H
Haojun Liao 已提交
777

778 779 780
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
781

H
Haojun Liao 已提交
782 783
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
784
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
785 786 787
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey,
              (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
788 789 790 791 792 793

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
794
  } else {
dengyihao's avatar
dengyihao 已提交
795
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
796
  }
H
Haojun Liao 已提交
797

798 799 800
  return true;
}

H
Haojun Liao 已提交
801 802 803 804 805
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

806
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
H
Haojun Liao 已提交
807
  STSRow *rmem = NULL, *rimem = NULL;
808 809 810
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
811
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
812 813 814 815 816 817
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
818
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
819 820 821 822 823 824 825 826 827
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
828
    return TD_ROW_KEY(rmem);
829 830 831 832
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
833
    return TD_ROW_KEY(rimem);
834 835
  }

H
Haojun Liao 已提交
836 837
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
838 839

  if (r1 == r2) {
dengyihao's avatar
dengyihao 已提交
840
    if (update == TD_ROW_DISCARD_UPDATE) {
841 842
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
dengyihao's avatar
dengyihao 已提交
843
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
844 845 846 847 848 849 850 851 852
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
dengyihao's avatar
dengyihao 已提交
853
  } else {
854 855 856 857 858
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

H
Haojun Liao 已提交
859 860
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) {
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
861 862 863
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
864
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
865 866
    }
  }
867

H
Haojun Liao 已提交
868 869 870
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
871
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
872 873
    }
  }
874

H
Haojun Liao 已提交
875 876
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
877
  }
878

H
Haojun Liao 已提交
879
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
880 881 882
    pCheckInfo->chosen = 0;
    return rmem;
  }
883

H
Haojun Liao 已提交
884
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
885 886 887
    pCheckInfo->chosen = 1;
    return rimem;
  }
888

H
Haojun Liao 已提交
889 890
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
891

892 893
  if (r1 == r2) {
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
894
      tSkipListIterNext(pCheckInfo->iter);
895
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
896
      return rimem;
dengyihao's avatar
dengyihao 已提交
897
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
898
      tSkipListIterNext(pCheckInfo->iiter);
899 900 901 902
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
903
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
904 905
      return rmem;
    }
H
Haojun Liao 已提交
906 907 908
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
909
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
910 911
        return rmem;
      } else {
912
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
913 914 915 916
        return rimem;
      }
    } else {
      if (r1 < r2) {
917
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
918 919
        return rimem;
      } else {
920
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
921 922 923 924
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
925 926
}

927
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
928
  bool hasNext = false;
929
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
930 931 932
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
933

H
Haojun Liao 已提交
934 935 936
    if (hasNext) {
      return hasNext;
    }
937

H
Haojun Liao 已提交
938 939 940
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
dengyihao's avatar
dengyihao 已提交
941
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
942 943 944
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
945

946 947 948
    if (hasNext) {
      return hasNext;
    }
949

950 951
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
952
    }
953 954 955 956 957 958 959
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
960
  }
961

H
Haojun Liao 已提交
962 963 964
  return hasNext;
}

965
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
Hongze Cheng 已提交
966
  STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
dengyihao's avatar
dengyihao 已提交
967
  size_t    size = taosArrayGetSize(pHandle->pTableCheckInfo);
968
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
969
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
970

971
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
972 973 974 975
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

H
Haojun Liao 已提交
976
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
977
  if (row == NULL) {
978 979
    return false;
  }
980

H
Haojun Liao 已提交
981
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
dengyihao's avatar
dengyihao 已提交
982 983
  tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
            pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
984

985
  // all data in mem are checked already.
986 987
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
988 989
    return false;
  }
H
Haojun Liao 已提交
990

dengyihao's avatar
dengyihao 已提交
991
  int32_t      step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
992
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
993
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
994

995 996 997 998
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
999

1000
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1001
    TSWAP(win->skey, win->ekey);
1002
  }
H
Haojun Liao 已提交
1003

1004
  return true;
1005
}
H
hjxilinx 已提交
1006

1007 1008
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
1009 1010 1011
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
1012

D
dapan1121 已提交
1013
  if (key < 0) {
1014
    key -= (daysPerFile * tsTickPerDay[precision]);
D
dapan1121 已提交
1015
  }
dengyihao's avatar
dengyihao 已提交
1016

1017
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision]));  // set the starting fileId
dengyihao's avatar
dengyihao 已提交
1018
  if (fid < 0L && llabs(fid) > INT32_MAX) {                                // data value overflow for INT32
1019 1020
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
1021

1022
  if (fid > 0L && fid > INT32_MAX) {
1023 1024
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
1025

S
TD-1057  
Shengliang Guan 已提交
1026
  return (int32_t)fid;
1027 1028
}

H
refact  
Hongze Cheng 已提交
1029
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1030 1031
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1032

1033
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1034

1035 1036 1037
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1038

1039
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1040

1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1052

1053 1054
  return midSlot;
}
1055

dengyihao's avatar
dengyihao 已提交
1056
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1057
  int32_t code = 0;
H
Haojun Liao 已提交
1058

1059
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1060
  pCheckInfo->numOfBlocks = 0;
1061

H
Haojun Liao 已提交
1062
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
1063
  table.pSchema = pTsdbReadHandle->pSchema;
H
Haojun Liao 已提交
1064 1065

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1066 1067 1068
    code = terrno;
    return code;
  }
1069

1070
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1071

H
Haojun Liao 已提交
1072
  // no data block in this file, try next file
1073
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1074 1075
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1076

H
Haojun Liao 已提交
1077 1078 1079
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1080
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1081 1082 1083 1084
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1085 1086
    }

H
Haojun Liao 已提交
1087 1088 1089
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1090

1091
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1092 1093
    return terrno;
  }
H
Haojun Liao 已提交
1094
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1095

H
Haojun Liao 已提交
1096
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1097

1098
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1099 1100
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1101
  } else {
dengyihao's avatar
dengyihao 已提交
1102 1103
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1104
  }
1105

dengyihao's avatar
dengyihao 已提交
1106 1107
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1108

H
Haojun Liao 已提交
1109 1110 1111
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1112

H
Haojun Liao 已提交
1113 1114 1115
  if (s > pCompInfo->blocks[start].keyLast) {
    return 0;
  }
1116

H
Haojun Liao 已提交
1117 1118 1119 1120
  // todo speedup the procedure of located end block
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
    end += 1;
  }
1121

H
Haojun Liao 已提交
1122
  pCheckInfo->numOfBlocks = (end - start);
1123

H
Haojun Liao 已提交
1124 1125 1126
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1127

H
Haojun Liao 已提交
1128 1129 1130
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1131

1132
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1133 1134 1135 1136
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1137
  pTsdbReadHandle->cost.headFileLoad += 1;
1138 1139
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1140
  size_t numOfTables = 0;
1141 1142 1143 1144
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1145

H
Haojun Liao 已提交
1146
    for (int32_t i = 0; i < numOfTables; ++i) {
1147
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1148
      if (code != TSDB_CODE_SUCCESS) {
1149 1150
        int64_t e = taosGetTimestampUs();

1151
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1152 1153 1154 1155 1156
        return code;
      }
    }
  } else {
    assert(0);
1157
  }
1158

1159
  int64_t e = taosGetTimestampUs();
1160
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1161
  return code;
1162 1163
}

dengyihao's avatar
dengyihao 已提交
1164 1165
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                   int32_t slotIndex) {
H
Haojun Liao 已提交
1166
  int64_t st = taosGetTimestampUs();
1167

C
Cary Xu 已提交
1168
  int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1169
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1170
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1171 1172 1173 1174
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1175
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1176
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1177
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1178 1179 1180 1181
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1182
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1183
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1184
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1185 1186 1187
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1188

1189
  int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
1190

dengyihao's avatar
dengyihao 已提交
1191 1192
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
                                      (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1193
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1194 1195 1196
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1197
  }
1198

1199
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1200

1201 1202
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1203
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1204

1205
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1206
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1207

1208
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1209

1210
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
dengyihao's avatar
dengyihao 已提交
1211
  if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1212
    int64_t* src = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1213
    for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
1214 1215 1216 1217
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1218
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1219
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1220

dengyihao's avatar
dengyihao 已提交
1221 1222 1223 1224
  tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
            " us, %s",
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1225
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1226 1227 1228 1229

_error:
  pBlock->numOfRows = 0;

dengyihao's avatar
dengyihao 已提交
1230
  tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
H
Haojun Liao 已提交
1231
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1232
  return terrno;
H
hjxilinx 已提交
1233 1234
}

1235
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
dengyihao's avatar
dengyihao 已提交
1236 1237 1238 1239 1240 1241 1242 1243
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end);
static void    moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void    doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void    copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                              SDataBlockInfo* pBlockInfo, int32_t endPos);

static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
1244
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1245
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1246
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1247
  TSKEY          key;
dengyihao's avatar
dengyihao 已提交
1248
  int32_t        code = TSDB_CODE_SUCCESS;
1249

1250
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1251 1252
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

1253
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
1254

H
Haojun Liao 已提交
1255
  if (key != TSKEY_INITIAL_VAL) {
dengyihao's avatar
dengyihao 已提交
1256
    tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1257
  } else {
H
Haojun Liao 已提交
1258
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1259
  }
H
Haojun Liao 已提交
1260

1261 1262
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

1263 1264 1265 1266
  if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
    if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
H
Haojun Liao 已提交
1267
      // do not load file block into buffer
1268
      int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
1269

dengyihao's avatar
dengyihao 已提交
1270 1271 1272 1273
      TSKEY maxKey =
          ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
      cur->rows =
          tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
1274
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1275 1276 1277

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1278
      if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1279
        TSWAP(cur->win.skey, cur->win.ekey);
H
Haojun Liao 已提交
1280
      }
H
Haojun Liao 已提交
1281

H
Haojun Liao 已提交
1282 1283
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1284
      return code;
H
Haojun Liao 已提交
1285
    }
H
Haojun Liao 已提交
1286

1287
    // return error, add test cases
1288
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1289
      return code;
1290 1291
    }

1292
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1293
  } else {
1294 1295 1296 1297 1298 1299
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1300 1301
    assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1302

1303 1304
    if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) ||
        (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
1305
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1306 1307

      cur->rows = binfo.rows;
dengyihao's avatar
dengyihao 已提交
1308
      cur->win = binfo.window;
1309
      cur->mixBlock = false;
H
Haojun Liao 已提交
1310 1311
      cur->blockCompleted = true;

1312
      if (ascScan) {
H
Haojun Liao 已提交
1313 1314 1315 1316 1317 1318
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
dengyihao's avatar
dengyihao 已提交
1319
    } else {  // partially copy to dest buffer
1320
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1321 1322
      cur->mixBlock = true;
    }
1323

H
Haojun Liao 已提交
1324
    assert(cur->blockCompleted);
H
Haojun Liao 已提交
1325
    if (cur->rows == binfo.rows) {
dengyihao's avatar
dengyihao 已提交
1326
      tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
H
Haojun Liao 已提交
1327
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1328
    } else {
dengyihao's avatar
dengyihao 已提交
1329 1330 1331 1332
      tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
                ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
                pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1333
    }
1334
  }
H
Haojun Liao 已提交
1335 1336

  return code;
1337 1338
}

1339 1340
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);

dengyihao's avatar
dengyihao 已提交
1341 1342
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                 bool* exists) {
1343
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1344 1345
  int32_t        code = TSDB_CODE_SUCCESS;
  bool           asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1346

1347
  if (asc) {
H
Haojun Liao 已提交
1348
    // query ended in/started from current block
1349 1350
    if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1351 1352
        *exists = false;
        return code;
1353
      }
1354

1355
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1356
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1357

1358 1359
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
1360
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1361 1362 1363
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1364

H
Haojun Liao 已提交
1365
      assert(pCheckInfo->lastKey <= pBlock->keyLast);
1366
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1367
    } else {  // the whole block is loaded in to buffer
dengyihao's avatar
dengyihao 已提交
1368
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1369
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1370
    }
dengyihao's avatar
dengyihao 已提交
1371
  } else {  // desc order, query ended in current block
1372 1373
    if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1374 1375
        *exists = false;
        return code;
1376
      }
H
Haojun Liao 已提交
1377

1378
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
1379
      if (pCheckInfo->lastKey < pBlock->keyLast) {
dengyihao's avatar
dengyihao 已提交
1380 1381
        cur->pos =
            binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1382
      } else {
H
Haojun Liao 已提交
1383
        cur->pos = pBlock->numOfRows - 1;
1384
      }
H
Haojun Liao 已提交
1385

H
Haojun Liao 已提交
1386
      assert(pCheckInfo->lastKey >= pBlock->keyFirst);
1387
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1388
    } else {
dengyihao's avatar
dengyihao 已提交
1389
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1390
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1391
    }
1392
  }
1393

1394
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1395
  return code;
H
[td-32]  
hjxilinx 已提交
1396 1397
}

1398
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1399
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1400
  int    numOfRows;
1401 1402
  TSKEY* keyList;

1403
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1404

1405
  if (num <= 0) return -1;
1406 1407

  keyList = (TSKEY*)pValue;
1408 1409
  firstPos = 0;
  lastPos = num - 1;
1410

1411
  if (order == TSDB_ORDER_DESC) {
1412 1413 1414 1415 1416
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1417

H
Haojun Liao 已提交
1418 1419
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1420

1421 1422 1423 1424 1425 1426 1427 1428
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1429

1430 1431 1432 1433 1434
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1435

1436 1437 1438 1439 1440 1441 1442
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1443

H
Haojun Liao 已提交
1444 1445
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1446

1447 1448 1449 1450 1451 1452 1453 1454 1455
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1456

1457 1458 1459
  return midPos;
}

dengyihao's avatar
dengyihao 已提交
1460 1461
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end) {
1462
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1463
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1464

1465
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1466 1467 1468 1469 1470 1471
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1472 1473
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
  int32_t trueStart = ascScan ? start : end;
1474
  int32_t trueEnd = ascScan ? end : start;
1475 1476
  int32_t step = ascScan ? 1 : -1;

1477
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1478

dengyihao's avatar
dengyihao 已提交
1479
  // data in buffer has greater timestamp, copy data in file block
1480
  int32_t i = 0, j = 0;
dengyihao's avatar
dengyihao 已提交
1481
  while (i < requiredNumOfCols && j < pCols->numOfCols) {
1482
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1483 1484 1485 1486 1487 1488 1489

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1490
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1491
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
dengyihao's avatar
dengyihao 已提交
1492
        //        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1493
        int32_t rowIndex = numOfRows;
1494
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1495
          SCellVal sVal = {0};
C
Cary Xu 已提交
1496
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1497 1498
            TASSERT(0);
          }
1499

1500
          if (sVal.valType == TD_VTYPE_NULL) {
1501
            colDataAppendNULL(pColInfo, rowIndex);
1502
          } else {
1503
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
1504 1505 1506
          }
        }
      } else {  // handle the var-string
1507 1508
        int32_t rowIndex = numOfRows;

1509
        // todo refactor, only copy one-by-one
1510
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1511
          SCellVal sVal = {0};
dengyihao's avatar
dengyihao 已提交
1512
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
H
Haojun Liao 已提交
1513 1514
            TASSERT(0);
          }
1515

1516
          if (sVal.valType == TD_VTYPE_NULL) {
1517
            colDataAppendNULL(pColInfo, rowIndex);
1518
          } else {
1519
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
1520
          }
1521 1522
        }
      }
1523 1524 1525

      j++;
      i++;
H
Hongze Cheng 已提交
1526
    } else {  // pColInfo->info.colId < src->colId, it is a NULL data
1527
      colDataAppendNNULL(pColInfo, numOfRows, num);
1528
      i++;
1529 1530
    }
  }
1531

dengyihao's avatar
dengyihao 已提交
1532
  while (i < requiredNumOfCols) {  // the remain columns are all null data
1533
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1534
    colDataAppendNNULL(pColInfo, numOfRows, num);
1535
    i++;
1536
  }
H
Haojun Liao 已提交
1537

1538 1539
  pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
  pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
1540

1541
  return numOfRows + num;
1542 1543
}

H
Haojun Liao 已提交
1544
// TODO fix bug for reverse copy data problem
1545
// Note: row1 always has high priority
H
Haojun Liao 已提交
1546 1547 1548 1549
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
                               STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                               bool forceSetNull) {
#if 1
dengyihao's avatar
dengyihao 已提交
1550 1551 1552 1553 1554 1555 1556 1557 1558
  STSchema* pSchema;
  STSRow*   row;
  int16_t   colId;
  int16_t   offset;

  bool     isRow1DataRow = TD_IS_TP_ROW(row1);
  bool     isRow2DataRow;
  bool     isChosenRowDataRow;
  int32_t  chosen_itr;
H
Haojun Liao 已提交
1559
  SCellVal sVal = {0};
1560

H
Haojun Liao 已提交
1561
  // the schema version info is embeded in STSRow
1562 1563 1564
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
H
Hongze Cheng 已提交
1565
    pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
1566
  }
1567

C
Cary Xu 已提交
1568 1569 1570 1571
#ifdef TD_DEBUG_PRINT_ROW
  tdSRowPrint(row1, pSchema1, __func__);
#endif

dengyihao's avatar
dengyihao 已提交
1572
  if (isRow1DataRow) {
1573
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1574
  } else {
H
Haojun Liao 已提交
1575
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1576
  }
1577

1578
  int32_t numOfColsOfRow2 = 0;
dengyihao's avatar
dengyihao 已提交
1579
  if (row2) {
H
Haojun Liao 已提交
1580
    isRow2DataRow = TD_IS_TP_ROW(row2);
1581
    if (pSchema2 == NULL) {
H
Hongze Cheng 已提交
1582
      pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
1583
    }
dengyihao's avatar
dengyihao 已提交
1584
    if (isRow2DataRow) {
1585 1586
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1587
      numOfColsOfRow2 = tdRowGetNCols(row2);
1588 1589
    }
  }
C
Cary Xu 已提交
1590

1591
  int32_t i = 0, j = 0, k = 0;
dengyihao's avatar
dengyihao 已提交
1592
  while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1593
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1594 1595

    int32_t colIdOfRow1;
dengyihao's avatar
dengyihao 已提交
1596
    if (j >= numOfColsOfRow1) {
1597
      colIdOfRow1 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1598
    } else if (isRow1DataRow) {
1599 1600
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
C
Cary Xu 已提交
1601
      colIdOfRow1 = tdKvRowColIdAt(row1, j);
1602 1603 1604
    }

    int32_t colIdOfRow2;
dengyihao's avatar
dengyihao 已提交
1605
    if (k >= numOfColsOfRow2) {
1606
      colIdOfRow2 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1607
    } else if (isRow2DataRow) {
1608 1609
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
C
Cary Xu 已提交
1610
      colIdOfRow2 = tdKvRowColIdAt(row2, k);
1611 1612
    }

dengyihao's avatar
dengyihao 已提交
1613 1614
    if (colIdOfRow1 == colIdOfRow2) {
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1615
        j++;
1616
        k++;
C
Cary Xu 已提交
1617 1618
        continue;
      }
1619 1620 1621 1622
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
dengyihao's avatar
dengyihao 已提交
1623 1624
    } else if (colIdOfRow1 < colIdOfRow2) {
      if (colIdOfRow1 < pColInfo->info.colId) {
1625 1626
        j++;
        continue;
C
Cary Xu 已提交
1627
      }
1628 1629 1630 1631 1632
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
dengyihao's avatar
dengyihao 已提交
1633
      if (colIdOfRow2 < pColInfo->info.colId) {
1634 1635 1636 1637 1638 1639 1640 1641
        k++;
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
dengyihao's avatar
dengyihao 已提交
1642
    if (isChosenRowDataRow) {
1643 1644
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
C
Cary Xu 已提交
1645 1646
      // TODO: use STSRowIter
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
1647
    } else {
C
Cary Xu 已提交
1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
      // TODO: use STSRowIter
      if (chosen_itr == 0) {
        colId = PRIMARYKEY_TIMESTAMP_COL_ID;
        tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
      } else {
        SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
        colId = pColIdx->colId;
        offset = pColIdx->offset;
        tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
      }
1658 1659 1660
    }

    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1661
      if (tdValTypeIsNorm(sVal.valType)) {
H
Haojun Liao 已提交
1662
        colDataAppend(pColInfo, numOfRows, sVal.val, false);
H
Haojun Liao 已提交
1663
      } else if (forceSetNull) {
H
Haojun Liao 已提交
1664
        colDataAppend(pColInfo, numOfRows, NULL, true);
1665
      }
H
Haojun Liao 已提交
1666

1667
      i++;
C
Cary Xu 已提交
1668

dengyihao's avatar
dengyihao 已提交
1669
      if (row == row1) {
C
Cary Xu 已提交
1670
        j++;
1671 1672 1673 1674
      } else {
        k++;
      }
    } else {
dengyihao's avatar
dengyihao 已提交
1675
      if (forceSetNull) {
H
Haojun Liao 已提交
1676
        colDataAppend(pColInfo, numOfRows, NULL, true);
C
Cary Xu 已提交
1677
      }
1678
      i++;
1679
    }
1680
  }
1681

dengyihao's avatar
dengyihao 已提交
1682 1683
  if (forceSetNull) {
    while (i < numOfCols) {  // the remain columns are all null data
1684
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
1685
      colDataAppend(pColInfo, numOfRows, NULL, true);
1686
      i++;
1687 1688
    }
  }
H
Haojun Liao 已提交
1689
#endif
1690
}
1691

1692 1693
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
  if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1694 1695 1696 1697
    return;
  }

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1698 1699
  if (numOfRows < pTsdbReadHandle->outputCapacity) {
    int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
dengyihao's avatar
dengyihao 已提交
1700
    for (int32_t i = 0; i < numOfCols; ++i) {
1701
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
dengyihao's avatar
dengyihao 已提交
1702 1703
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
              numOfRows * pColInfo->info.bytes);
1704 1705 1706 1707
    }
  }
}

dengyihao's avatar
dengyihao 已提交
1708 1709
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
                                int32_t numOfExisted, int32_t* start, int32_t* end) {
1710 1711
  *start = -1;

1712
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1713
    int32_t remain = endPos - startPos + 1;
1714 1715
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1716 1717
    } else {
      *end = endPos;
1718 1719 1720 1721 1722
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1723 1724
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1725 1726
    } else {
      *end = endPos;
1727 1728 1729 1730 1731 1732 1733
    }

    *start = *end;
    *end = startPos;
  }
}

dengyihao's avatar
dengyihao 已提交
1734 1735
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
                                 int32_t endPos) {
1736
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1737 1738

  pCheckInfo->lastKey = cur->lastKey;
1739
  pTsdbReadHandle->realNumOfRows = numOfRows;
1740 1741 1742 1743
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1744 1745
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1746 1747

  if (cur->rows > 0) {
1748 1749
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1750
    } else {
1751
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1752 1753
    }

1754
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
dengyihao's avatar
dengyihao 已提交
1755 1756
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
           cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
H
Haojun Liao 已提交
1757
  } else {
1758
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1759

dengyihao's avatar
dengyihao 已提交
1760
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1761
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1762 1763 1764
  }
}

dengyihao's avatar
dengyihao 已提交
1765 1766
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                           SDataBlockInfo* pBlockInfo, int32_t endPos) {
1767
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1768

1769
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1770
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1771

dengyihao's avatar
dengyihao 已提交
1772
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1773
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
H
Haojun Liao 已提交
1774 1775 1776 1777 1778 1779

  int32_t pos = cur->pos;

  int32_t start = cur->pos;
  int32_t end = endPos;

1780
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1781
    TSWAP(start, end);
H
Haojun Liao 已提交
1782 1783
  }

1784 1785
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1786 1787

  // the time window should always be ascending order: skey <= ekey
dengyihao's avatar
dengyihao 已提交
1788
  cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1789
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
H
Haojun Liao 已提交
1790
  cur->lastKey = tsArray[endPos] + step;
H
Haojun Liao 已提交
1791
  cur->blockCompleted = true;
H
Haojun Liao 已提交
1792 1793

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1794
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
H
Haojun Liao 已提交
1795 1796 1797

  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
  pos = endPos + step;
1798 1799
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1800

dengyihao's avatar
dengyihao 已提交
1801 1802 1803
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1804 1805
}

1806
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1807 1808
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
dengyihao's avatar
dengyihao 已提交
1809
  int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1810

1811
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1812
  SDataCols*     pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1813

1814
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
H
Haojun Liao 已提交
1815 1816
    endPos = pBlockInfo->rows - 1;
    cur->mixBlock = (cur->pos != 0);
1817
  } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
H
Haojun Liao 已提交
1818 1819 1820 1821
    endPos = 0;
    cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
  } else {
    assert(pCols->numOfRows > 0);
1822
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
H
Haojun Liao 已提交
1823 1824 1825 1826 1827 1828
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1829 1830
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1831 1832
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1833
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
H
Hongze Cheng 已提交
1834
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1835

1836
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1837

1838 1839
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
dengyihao's avatar
dengyihao 已提交
1840
         cur->pos >= 0 && cur->pos < pBlock->numOfRows);
H
Haojun Liao 已提交
1841

1842
  TSKEY* tsArray = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1843 1844
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
         tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
1845 1846

  // for search the endPos, so the order needs to reverse
dengyihao's avatar
dengyihao 已提交
1847
  int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
1848

dengyihao's avatar
dengyihao 已提交
1849
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1850
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
1851

H
Haojun Liao 已提交
1852
  STable* pTable = NULL;
1853
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1854

H
Hongze Cheng 已提交
1855 1856
  tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
            " rows:%d, start:%d, end:%d, %s",
dengyihao's avatar
dengyihao 已提交
1857 1858
            pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
            cur->pos, endPos, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1859

1860 1861
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
H
Haojun Liao 已提交
1862

dengyihao's avatar
dengyihao 已提交
1863 1864
  int16_t   rv1 = -1;
  int16_t   rv2 = -1;
1865 1866
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1867

H
Haojun Liao 已提交
1868 1869
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
1870

1871 1872
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
1873
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
1874
    return;
1875
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1876 1877
    SSkipListNode* node = NULL;
    do {
H
Haojun Liao 已提交
1878 1879
      STSRow* row2 = NULL;
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
1880
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
1881
        break;
1882
      }
1883

H
Haojun Liao 已提交
1884
      TSKEY key = TD_ROW_KEY(row1);
1885 1886
      if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1887 1888 1889
        break;
      }

dengyihao's avatar
dengyihao 已提交
1890 1891 1892 1893
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1894 1895 1896
        break;
      }

1897 1898
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
1899
        if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
1900
          //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1901
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
1902
        }
dengyihao's avatar
dengyihao 已提交
1903 1904
        if (row2 && rv2 != TD_ROW_SVER(row2)) {
          //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1905
          rv2 = TD_ROW_SVER(row2);
1906
        }
dengyihao's avatar
dengyihao 已提交
1907 1908 1909

        mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
                           pCheckInfo->tableId, pSchema1, pSchema2, true);
1910 1911 1912 1913
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1914

1915
        cur->win.ekey = key;
dengyihao's avatar
dengyihao 已提交
1916
        cur->lastKey = key + step;
1917 1918
        cur->mixBlock = true;

1919
        moveToNextRowInMem(pCheckInfo);
1920
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
H
TD-1439  
Hongze Cheng 已提交
1921
        if (pCfg->update) {
dengyihao's avatar
dengyihao 已提交
1922
          if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
1923
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
1924
          }
H
Haojun Liao 已提交
1925
          if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
1926
            //            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1927
            rv1 = TD_ROW_SVER(row1);
1928
          }
dengyihao's avatar
dengyihao 已提交
1929 1930
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            //            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1931
            rv2 = TD_ROW_SVER(row2);
1932
          }
dengyihao's avatar
dengyihao 已提交
1933

1934
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
dengyihao's avatar
dengyihao 已提交
1935 1936
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
                             pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
H
TD-1439  
Hongze Cheng 已提交
1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
1951
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
dengyihao's avatar
dengyihao 已提交
1952
                 (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1953 1954 1955
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1956

1957
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
1958 1959
        assert(end != -1);

dengyihao's avatar
dengyihao 已提交
1960
        if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
1961
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
1962 1963 1964 1965
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
1966
        }
1967

1968
        int32_t qstart = 0, qend = 0;
1969
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
1970

1971
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
1972 1973
        pos += (qend - qstart + 1) * step;

dengyihao's avatar
dengyihao 已提交
1974 1975
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
        cur->lastKey = cur->win.ekey + step;
1976
      }
1977
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
1978

1979
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
1980 1981 1982 1983
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1984
      if (node == NULL ||
H
Haojun Liao 已提交
1985
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
1986
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
H
Haojun Liao 已提交
1987
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
1988
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1989 1990 1991 1992 1993
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

1994
        int32_t start = -1, end = -1;
1995
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
1996

1997
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
1998
        pos += (end - start + 1) * step;
1999

dengyihao's avatar
dengyihao 已提交
2000 2001
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start];
        cur->lastKey = cur->win.ekey + step;
H
Haojun Liao 已提交
2002
        cur->mixBlock = true;
2003
      }
2004 2005
    }
  }
H
Haojun Liao 已提交
2006 2007

  cur->blockCompleted =
2008 2009
      (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
       ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
2010

2011
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
2012
    TSWAP(cur->win.skey, cur->win.ekey);
2013
  }
2014

2015 2016 2017
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2018

dengyihao's avatar
dengyihao 已提交
2019 2020 2021
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
2022 2023
}

2024
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2025
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2026
  int    numOfRows;
2027 2028
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2029
  if (num <= 0) return -1;
2030 2031

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2032 2033
  firstPos = 0;
  lastPos = num - 1;
2034

2035
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2036 2037 2038 2039 2040
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2041

H
Haojun Liao 已提交
2042 2043
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2044

H
[td-32]  
hjxilinx 已提交
2045 2046 2047 2048 2049 2050 2051 2052
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2053

H
[td-32]  
hjxilinx 已提交
2054 2055 2056 2057 2058
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2059

H
[td-32]  
hjxilinx 已提交
2060 2061 2062 2063 2064 2065 2066
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2067

H
Haojun Liao 已提交
2068 2069
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2070

H
[td-32]  
hjxilinx 已提交
2071 2072 2073 2074 2075 2076 2077 2078 2079
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2080

H
[td-32]  
hjxilinx 已提交
2081 2082 2083
  return midPos;
}

2084
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
2085 2086
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
2087 2088

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2089
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
2090
    taosMemoryFreeClear(pBlockInfo);
2091 2092
  }

wafwerar's avatar
wafwerar 已提交
2093
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2105
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2106 2107
    /* left block is empty */
    return 1;
2108
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2109 2110 2111 2112 2113 2114 2115
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2116
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
dengyihao's avatar
dengyihao 已提交
2117
#if 0  // TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2118 2119
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2120
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2121
  }
H
Haojun Liao 已提交
2122
#endif
2123

H
Haojun Liao 已提交
2124
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2125 2126
}

2127
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2128 2129
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2130 2131
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2132
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2133 2134 2135 2136
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

dengyihao's avatar
dengyihao 已提交
2137
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
2138 2139
  }

2140
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2141 2142
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2143
  // access data blocks according to the offset of each block in asc/desc order.
2144
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2145

2146 2147
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2148 2149 2150
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2151

2152
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2153
    cleanBlockOrderSupporter(&sup, 0);
2154
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2155
  }
H
Haojun Liao 已提交
2156

2157
  int32_t cnt = 0;
2158
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2159

2160
  for (int32_t j = 0; j < numOfTables; ++j) {
2161
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2162 2163 2164
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2165

H
refact  
Hongze Cheng 已提交
2166
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2167
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2168

wafwerar's avatar
wafwerar 已提交
2169
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2170
    if (buf == NULL) {
2171
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2172
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2173 2174
    }

2175
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2176 2177

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2178
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2179

H
Haojun Liao 已提交
2180 2181
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2182 2183 2184
      cnt++;
    }

2185
    numOfQualTables++;
2186 2187
  }

H
Haojun Liao 已提交
2188
  assert(numOfBlocks == cnt);
2189

H
Haojun Liao 已提交
2190 2191
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2192
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2193
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2194

H
Haojun Liao 已提交
2195
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2196
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2197 2198
    return TSDB_CODE_SUCCESS;
  }
2199

H
Haojun Liao 已提交
2200
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2201
            numOfQualTables, pTsdbReadHandle->idStr);
2202

2203
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2204
  sup.numOfTables = numOfQualTables;
2205

2206
  SMultiwayMergeTreeInfo* pTree = NULL;
dengyihao's avatar
dengyihao 已提交
2207
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2208 2209
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2210
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2211 2212 2213 2214 2215
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2216
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2217 2218
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2219
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2220
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2221 2222

    // set data block index overflow, in order to disable the offset comparator
2223 2224
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2225
    }
2226

H
Haojun Liao 已提交
2227
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2228 2229 2230 2231 2232
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2233
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2234 2235 2236
   * }
   */

H
Haojun Liao 已提交
2237
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2238
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2239
  taosMemoryFree(pTree);
2240 2241 2242 2243

  return TSDB_CODE_SUCCESS;
}

2244
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2245

dengyihao's avatar
dengyihao 已提交
2246 2247
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
  int32_t        step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2248
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2249

dengyihao's avatar
dengyihao 已提交
2250
  while (1) {
2251
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2252 2253 2254 2255
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2256 2257
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2258
      // all data blocks in current file has been checked already, try next file if exists
2259
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2260 2261 2262 2263
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2264
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2265 2266 2267 2268
    }
  }
}

2269 2270 2271
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2272 2273 2274

  int32_t code = TSDB_CODE_SUCCESS;

2275
  int32_t numOfBlocks = 0;
2276
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2277

H
Hongze Cheng 已提交
2278
  STsdbCfg*   pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
2279 2280
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Hongze Cheng 已提交
2281
  while (true) {
2282
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2283

2284 2285
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2286 2287 2288
      break;
    }

H
refact  
Hongze Cheng 已提交
2289
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2290 2291

    // current file are not overlapped with query time window, ignore remain files
2292 2293 2294
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2295 2296
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2297 2298
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2299 2300 2301
      break;
    }

2302 2303
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2304 2305 2306 2307
      code = terrno;
      break;
    }

2308
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2309

2310
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2311 2312 2313 2314
      code = terrno;
      break;
    }

2315
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2316 2317
      break;
    }
H
Haojun Liao 已提交
2318

H
Haojun Liao 已提交
2319 2320
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2321

2322 2323 2324 2325
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2326

2327
    // todo return error code to query engine
dengyihao's avatar
dengyihao 已提交
2328 2329
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
        TSDB_CODE_SUCCESS) {
2330 2331
      break;
    }
H
Haojun Liao 已提交
2332

2333 2334
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2335 2336 2337
      break;
    }
  }
H
Haojun Liao 已提交
2338

2339
  // no data in file anymore
2340
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2341
    if (code == TSDB_CODE_SUCCESS) {
2342
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2343 2344
    }

D
dapan1121 已提交
2345
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2346 2347
    *exists = false;
    return code;
2348
  }
H
Haojun Liao 已提交
2349

2350
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
dengyihao's avatar
dengyihao 已提交
2351
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
2352
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2353

2354 2355
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
  return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2356 2357 2358 2359 2360 2361 2362
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2363
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2364
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
2365

2366 2367
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2368 2369

  cur->slot += step;
dengyihao's avatar
dengyihao 已提交
2370
  cur->mixBlock = false;
H
Haojun Liao 已提交
2371
  cur->blockCompleted = false;
2372
}
H
Haojun Liao 已提交
2373 2374

int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
dengyihao's avatar
dengyihao 已提交
2375
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
H
Haojun Liao 已提交
2376

H
Haojun Liao 已提交
2377
  pTableBlockInfo->totalSize = 0;
2378
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2379

2380
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2381 2382

  // find the start data block in file
2383
  pTsdbReadHandle->locateStart = true;
H
Hongze Cheng 已提交
2384
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
refact  
Hongze Cheng 已提交
2385
  int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2386 2387

  tsdbRLockFS(pFileHandle);
2388 2389
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2390 2391
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2392
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2393

H
Haojun Liao 已提交
2394
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2395
  int32_t     numOfBlocks = 0;
2396
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2397
  int         defaultRows = 4096;  // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2398 2399
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Haojun Liao 已提交
2400 2401
  bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

H
Haojun Liao 已提交
2402 2403
  while (true) {
    numOfBlocks = 0;
2404
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2405

2406 2407
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2408 2409 2410
      break;
    }

H
refact  
Hongze Cheng 已提交
2411
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2412 2413

    // current file are not overlapped with query time window, ignore remain files
dengyihao's avatar
dengyihao 已提交
2414 2415
    if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
2416
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2417 2418
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2419
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2420 2421 2422
      break;
    }

H
Haojun Liao 已提交
2423
    pTableBlockInfo->numOfFiles += 1;
2424 2425
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2426 2427 2428 2429
      code = terrno;
      break;
    }

2430
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2431

2432
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2433 2434 2435 2436
      code = terrno;
      break;
    }

2437
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2438 2439 2440
      break;
    }

H
Haojun Liao 已提交
2441 2442
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2443 2444 2445 2446 2447 2448

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2449
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2450 2451 2452

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2453
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2454

H
Haojun Liao 已提交
2455
        int32_t numOfRows = pBlock[j].numOfRows;
2456
        pTableBlockInfo->totalRows += numOfRows;
H
Haojun Liao 已提交
2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
dengyihao's avatar
dengyihao 已提交
2468 2469 2470
        //        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
        //        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
        //        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2471 2472 2473 2474 2475 2476 2477
      }
    }
  }

  return code;
}

2478 2479 2480
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2481 2482

  // find the start data block in file
2483 2484
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
H
Hongze Cheng 已提交
2485
    STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
refact  
Hongze Cheng 已提交
2486
    int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2487

H
Hongze Cheng 已提交
2488
    tsdbRLockFS(pFileHandle);
2489 2490
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2491
    tsdbUnLockFS(pFileHandle);
2492

2493
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2494
  } else {
2495
    // check if current file block is all consumed
2496
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2497
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2498

2499
    // current block is done, try next
H
Haojun Liao 已提交
2500
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2501
      // all data blocks in current file has been checked already, try next file if exists
2502
    } else {
H
Haojun Liao 已提交
2503 2504
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2505 2506
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2507

H
Haojun Liao 已提交
2508 2509 2510 2511 2512 2513 2514
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2515 2516
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2517
    } else {
2518 2519 2520
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
      return getDataBlockRv(pTsdbReadHandle, pNext, exists);
2521 2522
    }
  }
2523 2524
}

2525 2526
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2527

2528 2529
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2530 2531
      return true;
    }
H
Haojun Liao 已提交
2532

2533
    pTsdbReadHandle->activeIndex += 1;
2534
  }
H
Haojun Liao 已提交
2535

2536 2537 2538
  return false;
}

dengyihao's avatar
dengyihao 已提交
2539
// todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2540
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2541
  // filter the queried time stamp in the first place
dengyihao's avatar
dengyihao 已提交
2542
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
2543 2544

  // starts from the buffer in case of descending timestamp order check data blocks
2545
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2546 2547

  int32_t i = 0;
dengyihao's avatar
dengyihao 已提交
2548
  while (i < numOfTables) {
2549
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2550 2551

    // the first qualified table for interpolation query
dengyihao's avatar
dengyihao 已提交
2552 2553 2554 2555
    //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
    //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
    //      break;
    //    }
H
Haojun Liao 已提交
2556 2557 2558 2559 2560 2561 2562 2563 2564

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2565
  STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
2566
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2567

2568 2569
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2570 2571 2572
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2573
                                 STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2574 2575
  int       numOfRows = 0;
  int32_t   numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Hongze Cheng 已提交
2576
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2577 2578
  win->skey = TSKEY_INITIAL_VAL;

dengyihao's avatar
dengyihao 已提交
2579 2580
  int64_t   st = taosGetTimestampUs();
  int16_t   rv = -1;
D
fix bug  
dapan1121 已提交
2581
  STSchema* pSchema = NULL;
H
Haojun Liao 已提交
2582 2583

  do {
H
Haojun Liao 已提交
2584
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
2585 2586 2587 2588
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2589
    TSKEY key = TD_ROW_KEY(row);
dengyihao's avatar
dengyihao 已提交
2590 2591 2592 2593
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
                key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2594 2595 2596 2597 2598 2599 2600 2601 2602

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2603
    if (rv != TD_ROW_SVER(row)) {
H
Hongze Cheng 已提交
2604
      pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
H
Haojun Liao 已提交
2605
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2606
    }
dengyihao's avatar
dengyihao 已提交
2607 2608
    mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
                       NULL, true);
H
Haojun Liao 已提交
2609 2610 2611 2612 2613 2614

    if (++numOfRows >= maxRowsToRead) {
      moveToNextRowInMem(pCheckInfo);
      break;
    }

dengyihao's avatar
dengyihao 已提交
2615
  } while (moveToNextRowInMem(pCheckInfo));
H
Haojun Liao 已提交
2616 2617 2618 2619

  assert(numOfRows <= maxRowsToRead);

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
2620
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
H
Haojun Liao 已提交
2621 2622
    int32_t emptySize = maxRowsToRead - numOfRows;

dengyihao's avatar
dengyihao 已提交
2623
    for (int32_t i = 0; i < numOfCols; ++i) {
2624
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
dengyihao's avatar
dengyihao 已提交
2625 2626
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
              numOfRows * pColInfo->info.bytes);
H
Haojun Liao 已提交
2627 2628 2629 2630
    }
  }

  int64_t elapsedTime = taosGetTimestampUs() - st;
dengyihao's avatar
dengyihao 已提交
2631 2632
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
            pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2633 2634 2635 2636

  return numOfRows;
}

2637 2638
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2639

2640 2641 2642 2643 2644
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2645

2646
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2647 2648 2649
    taosArrayPush(list, &info);
  }

2650
  metaCloseCtbCurosr(pCur);
H
Haojun Liao 已提交
2651 2652 2653 2654 2655 2656 2657 2658
  return TSDB_CODE_SUCCESS;
}

static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2659 2660 2661 2662 2663 2664
  //  tQueryInfo* pInfo = (tQueryInfo*)param;
  //  if (pInfo->optr != TSDB_RELATION_IN) {
  //    taosMemoryFreeClear(pInfo->q);
  //  } else {
  //    taosHashCleanup((SHashObj *)(pInfo->q));
  //  }
H
Haojun Liao 已提交
2665

wafwerar's avatar
wafwerar 已提交
2666
  taosMemoryFree(param);
H
Haojun Liao 已提交
2667 2668
}

dengyihao's avatar
dengyihao 已提交
2669 2670
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
2671

dengyihao's avatar
dengyihao 已提交
2672
static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
2673
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2674 2675
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2676

2677
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2678
    if (code != TSDB_CODE_SUCCESS) {
2679
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2680 2681
      return false;
    }
H
Haojun Liao 已提交
2682

H
Haojun Liao 已提交
2683
    if (exists) {
dengyihao's avatar
dengyihao 已提交
2684
      tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
2685 2686 2687
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2688 2689
      }

dengyihao's avatar
dengyihao 已提交
2690
      pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2691 2692
      return exists;
    }
H
Haojun Liao 已提交
2693

2694
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2695
  }
H
Haojun Liao 已提交
2696

2697 2698
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2699 2700
    return true;
  }
H
Haojun Liao 已提交
2701

H
Haojun Liao 已提交
2702
  // current result is empty
dengyihao's avatar
dengyihao 已提交
2703 2704 2705
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
      pTsdbReadHandle->cur.rows == 0) {
    //    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2706

dengyihao's avatar
dengyihao 已提交
2707 2708
    //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
    //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2709

2710
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2711

dengyihao's avatar
dengyihao 已提交
2712 2713
    //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
    //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
2714
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2715 2716

    return result;
2717
  }
H
Haojun Liao 已提交
2718

H
Haojun Liao 已提交
2719 2720
  return false;
}
2721

2722
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2723
  // the last row is cached in buffer, return it directly.
2724
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
dengyihao's avatar
dengyihao 已提交
2725
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
2726
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2727 2728
  assert(numOfTables > 0 && numOfCols > 0);

2729
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2730

dengyihao's avatar
dengyihao 已提交
2731 2732 2733
  STSRow* pRow = NULL;
  TSKEY   key = TSKEY_INITIAL_VAL;
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2734 2735 2736

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
dengyihao's avatar
dengyihao 已提交
2737 2738 2739 2740 2741 2742
    //    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
    //    if (ret != TSDB_CODE_SUCCESS) {
    //      return false;
    //    }
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId,
                       NULL, NULL, true);
wafwerar's avatar
wafwerar 已提交
2743
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2744

H
Haojun Liao 已提交
2745 2746 2747
    // update the last key value
    pCheckInfo->lastKey = key + step;

dengyihao's avatar
dengyihao 已提交
2748 2749
    cur->rows = 1;  // only one row
    cur->lastKey = key + step;
H
Haojun Liao 已提交
2750 2751 2752 2753 2754
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2755
  }
H
Haojun Liao 已提交
2756

H
Haojun Liao 已提交
2757 2758 2759
  return false;
}

dengyihao's avatar
dengyihao 已提交
2760
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
dengyihao's avatar
dengyihao 已提交
2780 2781
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
//      pTable->tableId); continue;
2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2912 2913 2914
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
2915

dengyihao's avatar
dengyihao 已提交
2916
  while (pTsdbReadHandle->activeIndex < numOfTables) {
2917
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2918 2919 2920
      return true;
    }

2921
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
2922 2923
    pCheckInfo->numOfBlocks = 0;

2924 2925
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
2926 2927
    pTsdbReadHandle->checkFiles = true;
    pTsdbReadHandle->cur.rows = 0;
2928
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
2929 2930 2931 2932

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
2933
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2934 2935 2936
  }

  return false;
2937 2938
}

H
Haojun Liao 已提交
2939
// handle data in cache situation
H
Haojun Liao 已提交
2940
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
2941
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
Y
yihaoDeng 已提交
2942

dengyihao's avatar
dengyihao 已提交
2943
  for (int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
2944 2945 2946 2947
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

2948
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
2949 2950
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
              pTsdbReadHandle->idStr);
2951 2952 2953
    return false;
  }

Y
yihaoDeng 已提交
2954 2955 2956
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

2957
  // TODO refactor: remove "type"
2958 2959
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
dengyihao's avatar
dengyihao 已提交
2960
      //      return loadCachedLastRow(pTsdbReadHandle);
2961
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
dengyihao's avatar
dengyihao 已提交
2962
      //      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
2963
    }
H
Haojun Liao 已提交
2964
  }
Y
yihaoDeng 已提交
2965

2966 2967
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
dengyihao's avatar
dengyihao 已提交
2968
  } else {  // loadType == RR and Offset Order
2969
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2970 2971 2972
      // check if the query range overlaps with the file data block
      bool exists = true;

2973
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2974
      if (code != TSDB_CODE_SUCCESS) {
2975 2976
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2977 2978 2979 2980 2981

        return false;
      }

      if (exists) {
2982
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
2983 2984
        return exists;
      }
Y
yihaoDeng 已提交
2985

2986 2987
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
2988 2989
    }

H
Haojun Liao 已提交
2990
    // TODO: opt by consider the scan order
2991
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
2992
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
2993

H
Haojun Liao 已提交
2994
    elapsedTime = taosGetTimestampUs() - stime;
2995
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2996
    return ret;
Y
yihaoDeng 已提交
2997 2998
  }
}
2999

dengyihao's avatar
dengyihao 已提交
3000
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
3035
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
3036 3037 3038 3039 3040 3041 3042 3043 3044
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
3045
//  SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
3046 3047 3048 3049 3050 3051 3052 3053
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
3054
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
3055 3056 3057 3058 3059 3060 3061 3062 3063 3064
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3065
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
3066
//  taosMemoryFreeClear(cond.colList);
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
dengyihao's avatar
dengyihao 已提交
3105
// out_of_memory:
3106
//  tsdbCleanupReadHandle(pSecQueryHandle);
3107 3108 3109
//  return terrno;
//}

H
Haojun Liao 已提交
3110
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3111 3112
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
  SQueryFilePos*   cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3113

H
Haojun Liao 已提交
3114 3115
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3116
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3117 3118
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3119 3120
  }

dengyihao's avatar
dengyihao 已提交
3121
  int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3122
  for (int32_t i = 0; i < numOfCols; ++i) {
3123 3124
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3125 3126 3127

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3128
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3129
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3130 3131

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3132
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3133
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3134 3135 3136
    }
  }

H
Haojun Liao 已提交
3137 3138
  cur->rows = 2;
  return true;
3139 3140
}

3141
/*
3142
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3143
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3144
 */
H
Haojun Liao 已提交
3145
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3162
// out:
3163 3164 3165 3166
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3167
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
dengyihao's avatar
dengyihao 已提交
3168
  return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3169 3170
}

dengyihao's avatar
dengyihao 已提交
3171
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) {
3172 3173
  assert(pTsdbReadHandle != NULL && groupList != NULL);

dengyihao's avatar
dengyihao 已提交
3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197
  //  TSKEY    key = TSKEY_INITIAL_VAL;
  //
  //  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
  //  assert(group != NULL);
  //
  //  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
  //
  //  int32_t code = 0;
  //
  //  if (((STable*)pInfo->pTable)->lastRow) {
  //    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
  //    if (code != TSDB_CODE_SUCCESS) {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
  //    } else {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
  //    }
  //  }
  //
  //  // update the tsdb query time range
  //  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
  //    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
  //    pTsdbReadHandle->checkFiles  = false;
  //    pTsdbReadHandle->activeIndex = -1;  // start from -1
  //  }
H
Haojun Liao 已提交
3198

3199
  return TSDB_CODE_SUCCESS;
3200 3201
}

3202 3203
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3204 3205

  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
3206 3207 3208
  //  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
  //    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
  //  }
D
update  
dapan1121 已提交
3209 3210

  // update the tsdb query time range
3211
  if (pTsdbReadHandle->cachelastrow) {
dengyihao's avatar
dengyihao 已提交
3212
    pTsdbReadHandle->checkFiles = false;
3213
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3214 3215 3216 3217 3218
  }

  return code;
}

dengyihao's avatar
dengyihao 已提交
3219
STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) {
H
Haojun Liao 已提交
3220
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3221

H
Haojun Liao 已提交
3222
  int32_t totalNumOfTable = 0;
3223
  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
H
Haojun Liao 已提交
3224

H
Haojun Liao 已提交
3225 3226
  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
dengyihao's avatar
dengyihao 已提交
3227
  for (int32_t j = 0; j < numOfGroups; ++j) {
H
Haojun Liao 已提交
3228 3229
    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
    TSKEY   key = TSKEY_INITIAL_VAL;
H
Haojun Liao 已提交
3230

H
Haojun Liao 已提交
3231
    STableKeyInfo keyInfo = {0};
H
Haojun Liao 已提交
3232

H
Haojun Liao 已提交
3233
    size_t numOfTables = taosArrayGetSize(pGroup);
dengyihao's avatar
dengyihao 已提交
3234 3235
    for (int32_t i = 0; i < numOfTables; ++i) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
H
Haojun Liao 已提交
3236

H
Haojun Liao 已提交
3237
      // if the lastKey equals to INT64_MIN, there is no data in this table
dengyihao's avatar
dengyihao 已提交
3238
      TSKEY lastKey = 0;  //((STable*)(pInfo->pTable))->lastKey;
H
Haojun Liao 已提交
3239 3240
      if (key < lastKey) {
        key = lastKey;
H
Haojun Liao 已提交
3241

dengyihao's avatar
dengyihao 已提交
3242
        //        keyInfo.pTable  = pInfo->pTable;
H
Haojun Liao 已提交
3243
        keyInfo.lastKey = key;
dengyihao's avatar
dengyihao 已提交
3244
        pInfo->lastKey = key;
H
Haojun Liao 已提交
3245

H
Haojun Liao 已提交
3246 3247 3248
        if (key < window.skey) {
          window.skey = key;
        }
3249

H
Haojun Liao 已提交
3250 3251 3252 3253
        if (key > window.ekey) {
          window.ekey = key;
        }
      }
3254
    }
H
Haojun Liao 已提交
3255

H
Haojun Liao 已提交
3256
    // more than one table in each group, only one table left for each group
dengyihao's avatar
dengyihao 已提交
3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268
    //    if (keyInfo.pTable != NULL) {
    //      totalNumOfTable++;
    //      if (taosArrayGetSize(pGroup) == 1) {
    //        // do nothing
    //      } else {
    //        taosArrayClear(pGroup);
    //        taosArrayPush(pGroup, &keyInfo);
    //      }
    //    } else {  // mark all the empty groups, and remove it later
    //      taosArrayDestroy(pGroup);
    //      taosArrayPush(emptyGroup, &j);
    //    }
3269
  }
H
Haojun Liao 已提交
3270

H
Haojun Liao 已提交
3271 3272 3273
  // window does not being updated, so set the original
  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
    window = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3274
    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
H
Haojun Liao 已提交
3275 3276
  }

dengyihao's avatar
dengyihao 已提交
3277
  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
3278 3279
  taosArrayDestroy(emptyGroup);

H
Haojun Liao 已提交
3280
  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3281
  return window;
H
hjxilinx 已提交
3282 3283
}

H
Haojun Liao 已提交
3284
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3285
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
dengyihao's avatar
dengyihao 已提交
3286
  SQueryFilePos*   cur = &pHandle->cur;
3287 3288

  uint64_t uid = 0;
H
Haojun Liao 已提交
3289

3290
  // there are data in file
D
dapan1121 已提交
3291
  if (pHandle->cur.fid != INT32_MIN) {
3292
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3293
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3294
  } else {
3295
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3296
    uid = pCheckInfo->tableId;
3297
  }
3298

dengyihao's avatar
dengyihao 已提交
3299 3300
  tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
            cur->win.skey, cur->win.ekey, pHandle->idStr);
3301

3302
  pDataBlockInfo->uid = uid;
3303 3304 3305 3306 3307 3308

#if 0
  // for multi-group data query processing test purpose
  pDataBlockInfo->groupId = uid;
#endif

dengyihao's avatar
dengyihao 已提交
3309
  pDataBlockInfo->rows = cur->rows;
H
Haojun Liao 已提交
3310
  pDataBlockInfo->window = cur->win;
3311
//  ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
3312
}
H
hjxilinx 已提交
3313

H
Haojun Liao 已提交
3314 3315 3316
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3317
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
dengyihao's avatar
dengyihao 已提交
3318
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3319
  *allHave = false;
H
Haojun Liao 已提交
3320

H
Haojun Liao 已提交
3321 3322
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3323 3324 3325
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3326

H
Haojun Liao 已提交
3327 3328 3329 3330
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3331 3332 3333 3334
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3335 3336

  int64_t stime = taosGetTimestampUs();
3337 3338
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3339
    return terrno;
3340 3341 3342
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3343
  }
H
Haojun Liao 已提交
3344

C
Cary Xu 已提交
3345 3346 3347
  tsdbDebug("vgId:%d succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
            TSDB_READ_TABLE_UID(&pHandle->rhelper));

3348
  int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
3349

H
Haojun Liao 已提交
3350
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
3351 3352 3353
  memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

dengyihao's avatar
dengyihao 已提交
3354
  for (int32_t i = 0; i < numOfCols; ++i) {
3355
    pHandle->suppInfo.pstatis[i].colId = colIds[i];
3356
  }
H
Haojun Liao 已提交
3357

3358 3359
  *allHave = true;
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3360 3361

  // always load the first primary timestamp column data
3362
  SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
3363
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3364 3365 3366 3367

  pPrimaryColStatis->numOfNull = 0;
  pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
  pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
3368
  pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
H
Haojun Liao 已提交
3369

dengyihao's avatar
dengyihao 已提交
3370
  // update the number of NULL data rows
3371
  int32_t* slotIds = pHandle->suppInfo.slotIds;
dengyihao's avatar
dengyihao 已提交
3372
  for (int32_t i = 1; i < numOfCols; ++i) {
3373
    ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
C
Cary Xu 已提交
3374
    if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) {
3375 3376 3377 3378 3379 3380 3381
      if (pHandle->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
        pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
      } else {
        pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
      }
    } else {
      *allHave = false;
H
Haojun Liao 已提交
3382 3383
    }
  }
H
Haojun Liao 已提交
3384 3385 3386 3387

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

3388
  *pBlockStatis = pHandle->suppInfo.plist;
3389
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3390 3391
}

H
Haojun Liao 已提交
3392
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3393
  /**
H
hjxilinx 已提交
3394
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3395 3396
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3397
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3398
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3399 3400
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3401 3402
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3403

3404
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3405 3406
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3407
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3408
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3409

H
hjxilinx 已提交
3410 3411
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3412

H
Hongze Cheng 已提交
3413
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3414
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3415
        return pHandle->pColumns;
H
Haojun Liao 已提交
3416
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3417
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3418
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3419 3420
          return NULL;
        }
H
Haojun Liao 已提交
3421

H
Haojun Liao 已提交
3422
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
hjxilinx 已提交
3423 3424
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3425 3426
    }
  }
H
hjxilinx 已提交
3427
}
3428
#if 0
3429
void filterPrepare(void* expr, void* param) {
3430
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
3431
  if (pExpr->_node.info != NULL) {
3432 3433
    return;
  }
3434

wafwerar's avatar
wafwerar 已提交
3435
  pExpr->_node.info = taosMemoryCalloc(1, sizeof(tQueryInfo));
H
Haojun Liao 已提交
3436

3437
  STSchema*   pTSSchema = (STSchema*) param;
H
hjxilinx 已提交
3438 3439 3440
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
3441

3442 3443
  pInfo->sch      = *pSchema;
  pInfo->optr     = pExpr->_node.optr;
Y
yihaoDeng 已提交
3444
  pInfo->compare  = getComparFunc(pInfo->sch.type, pInfo->optr);
H
Haojun Liao 已提交
3445
  pInfo->indexed  = pTSSchema->columns->colId == pInfo->sch.colId;
H
Haojun Liao 已提交
3446

H
hjxilinx 已提交
3447
  if (pInfo->optr == TSDB_RELATION_IN) {
Y
yihaoDeng 已提交
3448
     int dummy = -1;
3449
     SHashObj *pObj = NULL;
Y
yihaoDeng 已提交
3450 3451 3452 3453
     if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
        pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
        SArray *arr = (SArray *)(pCond->arr);
        for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
Y
yihaoDeng 已提交
3454
          char* p = taosArrayGetP(arr, i);
3455 3456
          strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
          taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
Y
yihaoDeng 已提交
3457 3458 3459 3460
        }
     } else {
       buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
     }
3461
     pInfo->q = (char *)pObj;
H
Haojun Liao 已提交
3462
  } else if (pCond != NULL) {
3463 3464 3465 3466
    uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
    if (size < (uint32_t)pSchema->bytes) {
      size = pSchema->bytes;
    }
wafwerar's avatar
wafwerar 已提交
3467
    // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
wafwerar's avatar
wafwerar 已提交
3468
    pInfo->q = taosMemoryCalloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
3469
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
3470
  }
3471 3472
}

3473
#endif
3474

dengyihao's avatar
dengyihao 已提交
3475
static int32_t tableGroupComparFn(const void* p1, const void* p2, const void* param) {
3476
#if 0
3477
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
3478 3479
  STable* pTable1 = ((STableKeyInfo*) p1)->uid;
  STable* pTable2 = ((STableKeyInfo*) p2)->uid;
H
Haojun Liao 已提交
3480

3481 3482 3483
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
H
Haojun Liao 已提交
3484

H
Haojun Liao 已提交
3485
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
H
Haojun Liao 已提交
3486

3487 3488 3489 3490
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
H
Haojun Liao 已提交
3491

H
Haojun Liao 已提交
3492 3493 3494
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
3495
      type = TSDB_DATA_TYPE_BINARY;
3496
      bytes = tGetTbnameColumnSchema()->bytes;
3497
    } else {
Y
yihaoDeng 已提交
3498 3499 3500 3501 3502 3503 3504
      if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
        STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
        bytes = pCol->bytes;
        type = pCol->type;
        f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
        f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
      } 
3505
    }
H
Haojun Liao 已提交
3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

3520 3521 3522 3523 3524 3525 3526
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
3527
#endif
3528 3529 3530
  return 0;
}

H
Haojun Liao 已提交
3531
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3532
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3533
    return -1;
3534
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3535 3536 3537 3538 3539 3540 3541 3542 3543
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
                          STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
3544
  STable* pTable = taosArrayGetP(pTableList, 0);
H
Haojun Liao 已提交
3545 3546
  SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));

3547
  STableKeyInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
3548
  taosArrayPush(g, &info);
3549

3550
  for (int32_t i = 1; i < numOfTables; ++i) {
3551 3552
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
Haojun Liao 已提交
3553

H
hjxilinx 已提交
3554
    int32_t ret = compareFn(prev, p, pSupp);
3555
    assert(ret == 0 || ret == -1);
H
Haojun Liao 已提交
3556

3557
    if (ret == 0) {
3558
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3559
      taosArrayPush(g, &info1);
3560 3561
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
H
Haojun Liao 已提交
3562 3563
      g = taosArrayInit(16, sizeof(STableKeyInfo));

3564
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3565
      taosArrayPush(g, &info1);
3566 3567
    }
  }
H
Haojun Liao 已提交
3568

3569
  taosArrayPush(pGroups, &g);
3570 3571
}

dengyihao's avatar
dengyihao 已提交
3572 3573
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
                         TSKEY skey) {
3574
  assert(pTableList != NULL);
3575
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3576

3577 3578
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
3579
    tsdbDebug("no qualified tables");
3580 3581
    return pTableGroup;
  }
H
Haojun Liao 已提交
3582

dengyihao's avatar
dengyihao 已提交
3583
  if (numOfOrderCols == 0 || size == 1) {  // no group by tags clause or only one table
3584
    SArray* sa = taosArrayDup(pTableList);
H
Haojun Liao 已提交
3585 3586 3587 3588
    if (sa == NULL) {
      taosArrayDestroy(pTableGroup);
      return NULL;
    }
H
Haojun Liao 已提交
3589

3590
    taosArrayPush(pTableGroup, &sa);
S
TD-1057  
Shengliang Guan 已提交
3591
    tsdbDebug("all %" PRIzu " tables belong to one group", size);
3592
  } else {
H
Haojun Liao 已提交
3593 3594
    STableGroupSupporter sup = {0};
    sup.numOfCols = numOfOrderCols;
3595
    sup.pTagSchema = pTagSchema->pSchema;
H
Haojun Liao 已提交
3596 3597
    sup.pCols = pCols;

3598 3599
    taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
    createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
3600
  }
H
Haojun Liao 已提交
3601

3602 3603 3604
  return pTableGroup;
}

dengyihao's avatar
dengyihao 已提交
3605
// static bool tableFilterFp(const void* pNode, void* param) {
3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688
//  tQueryInfo* pInfo = (tQueryInfo*) param;
//
//  STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
//  char* val = NULL;
//  if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
//    val = (char*) TABLE_NAME(pTable);
//  } else {
//    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
//  }
//
//  if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
//    if (pInfo->optr == TSDB_RELATION_ISNULL) {
//      return (val == NULL) || isNull(val, pInfo->sch.type);
//    } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
//      return (val != NULL) && (!isNull(val, pInfo->sch.type));
//    }
//  } else if (pInfo->optr == TSDB_RELATION_IN) {
//     int type = pInfo->sch.type;
//     if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
//       int64_t v;
//       GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
//       uint64_t v;
//       GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     }
//     else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
//       double v;
//       GET_TYPED_DATA(v, double, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
//       return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
//     }
//
//  }
//
//  int32_t ret = 0;
//  if (val == NULL) { //the val is possible to be null, so check it out carefully
//    ret = -1; // val is missing in table tags value pairs
//  } else {
//    ret = pInfo->compare(val, pInfo->q);
//  }
//
//  switch (pInfo->optr) {
//    case TSDB_RELATION_EQUAL: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NOT_EQUAL: {
//      return ret != 0;
//    }
//    case TSDB_RELATION_GREATER_EQUAL: {
//      return ret >= 0;
//    }
//    case TSDB_RELATION_GREATER: {
//      return ret > 0;
//    }
//    case TSDB_RELATION_LESS_EQUAL: {
//      return ret <= 0;
//    }
//    case TSDB_RELATION_LESS: {
//      return ret < 0;
//    }
//    case TSDB_RELATION_LIKE: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_MATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NMATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_IN: {
//      return ret == 1;
//    }
//
//    default:
//      assert(false);
//  }
//
//  return true;
//}
H
Haojun Liao 已提交
3689

dengyihao's avatar
dengyihao 已提交
3690 3691
// static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp
// *param);
3692

dengyihao's avatar
dengyihao 已提交
3693 3694
// static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
//  //  // query according to the expression tree
3695
//  SExprTraverseSupp supp = {
dengyihao's avatar
dengyihao 已提交
3696
//      .nodeFilterFn = (__result_filter_fn_t)tableFilterFp,
3697 3698
//      .setupInfoFn = filterPrepare,
//      .pExtInfo = pSTable->tagSchema,
dengyihao's avatar
dengyihao 已提交
3699
//  };
3700 3701 3702 3703 3704
//
//  getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
//  tExprTreeDestroy(pExpr, destroyHelper);
//  return TSDB_CODE_SUCCESS;
//}
3705

H
Haojun Liao 已提交
3706
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
3707
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
3708
                                 SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
H
Hongze Cheng 已提交
3709 3710
  SMetaReader mr = {0};

H
Hongze Cheng 已提交
3711
  metaReaderInit(&mr, (SMeta*)pMeta, 0);
H
Hongze Cheng 已提交
3712 3713

  if (metaGetTableEntryByUid(&mr, uid) < 0) {
dengyihao's avatar
dengyihao 已提交
3714
    tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
3715 3716
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
C
Cary Xu 已提交
3717 3718
  } else {
    tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
3719
  }
H
Haojun Liao 已提交
3720

H
Hongze Cheng 已提交
3721
  if (mr.me.type != TSDB_SUPER_TABLE) {
dengyihao's avatar
dengyihao 已提交
3722 3723 3724
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
              reqId);
    terrno = TSDB_CODE_OPS_NOT_SUPPORT;  // basically, this error is caused by invalid sql issued by client
3725
    goto _error;
H
hjxilinx 已提交
3726
  }
3727

H
Hongze Cheng 已提交
3728 3729
  metaReaderClear(&mr);

dengyihao's avatar
dengyihao 已提交
3730 3731
  // NOTE: not add ref count for super table
  SArray*         res = taosArrayInit(8, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
3732
  SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
H
Haojun Liao 已提交
3733

weixin_48148422's avatar
weixin_48148422 已提交
3734 3735
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
H
Haojun Liao 已提交
3736
    int32_t ret = getAllTableList(pMeta, uid, res);
3737 3738
    if (ret != TSDB_CODE_SUCCESS) {
      goto _error;
3739
    }
3740

dengyihao's avatar
dengyihao 已提交
3741 3742
    pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
    pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
H
Haojun Liao 已提交
3743

dengyihao's avatar
dengyihao 已提交
3744 3745 3746
    tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64
              " QID:0x%" PRIx64,
              pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
3747

3748
    taosArrayDestroy(res);
3749 3750
    return ret;
  }
3751

H
hjxilinx 已提交
3752
  int32_t ret = TSDB_CODE_SUCCESS;
3753

dengyihao's avatar
dengyihao 已提交
3754 3755
  SFilterInfo* filterInfo = NULL;
  ret = filterInitFromNode((SNode*)pTagCond, &filterInfo, 0);
dengyihao's avatar
dengyihao 已提交
3756 3757 3758 3759 3760 3761 3762 3763
  if (ret != TSDB_CODE_SUCCESS) {
    terrno = ret;
    return ret;
  }
  ret = tsdbQueryTableList(pMeta, res, filterInfo);
  pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
  pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);

dengyihao's avatar
dengyihao 已提交
3764 3765
  // tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb,
  //          pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
dengyihao's avatar
dengyihao 已提交
3766 3767 3768 3769 3770

  taosArrayDestroy(res);
  return ret;

_error:
3771
  return terrno;
3772
}
3773

dengyihao's avatar
dengyihao 已提交
3774 3775 3776 3777 3778
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
  // impl later

  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
3779
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
H
Hongze Cheng 已提交
3780 3781
  SMetaReader mr = {0};

H
Hongze Cheng 已提交
3782
  metaReaderInit(&mr, (SMeta*)pMeta, 0);
H
Hongze Cheng 已提交
3783 3784

  if (metaGetTableEntryByUid(&mr, uid) < 0) {
3785 3786
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3787
  }
3788

H
Hongze Cheng 已提交
3789 3790
  metaReaderClear(&mr);

3791 3792
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3793

H
Haojun Liao 已提交
3794 3795
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));

3796
  STableKeyInfo info = {.lastKey = startKey, .uid = uid};
H
Haojun Liao 已提交
3797
  taosArrayPush(group, &info);
H
Haojun Liao 已提交
3798

3799
  taosArrayPush(pGroupInfo->pGroupList, &group);
3800
  return TSDB_CODE_SUCCESS;
3801

dengyihao's avatar
dengyihao 已提交
3802
_error:
H
Hongze Cheng 已提交
3803
  metaReaderClear(&mr);
3804
  return terrno;
3805
}
3806

3807
#if 0
3808
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
3809 3810 3811
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
3812 3813 3814 3815

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3816
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
3817

B
Bomin Zhang 已提交
3818
  for(int32_t i = 0; i < size; ++i) {
3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
D
fix bug  
dapan1121 已提交
3830 3831 3832
      tsdbUnlockRepoMeta(tsdb);
      taosArrayDestroy(group);
      return terrno;
3833 3834
    }

H
Haojun Liao 已提交
3835 3836
    STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
    taosArrayPush(group, &info);
3837 3838
  }

B
Bomin Zhang 已提交
3839 3840 3841 3842
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
3843

sangshuduo's avatar
sangshuduo 已提交
3844
  pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
B
Bomin Zhang 已提交
3845 3846 3847 3848 3849
  if (pGroupInfo->numOfTables > 0) {
    taosArrayPush(pGroupInfo->pGroupList, &group);
  } else {
    taosArrayDestroy(group);
  }
3850 3851 3852

  return TSDB_CODE_SUCCESS;
}
3853
#endif
3854 3855 3856 3857 3858 3859 3860 3861
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3862
    taosMemoryFreeClear(pColInfo->pData);
3863 3864 3865 3866 3867 3868
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3869 3870 3871 3872 3873 3874
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3875
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3876 3877 3878 3879 3880 3881
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

H
Haojun Liao 已提交
3882
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3883 3884
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3885 3886
    return;
  }
3887

3888
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3889

3890
  taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3891
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
3892 3893
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
3894

3895
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3896
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3897
  } else {
3898
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3899 3900
  }

3901 3902
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3903
  }
3904

3905
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3906

3907 3908
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3909

3910 3911
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3912

3913
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3914

dengyihao's avatar
dengyihao 已提交
3915 3916 3917 3918
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
            pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3919

wafwerar's avatar
wafwerar 已提交
3920
  taosMemoryFreeClear(pTsdbReadHandle);
3921
}
3922

3923
#if 0
H
Haojun Liao 已提交
3924
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
3925 3926 3927 3928 3929 3930 3931 3932 3933 3934
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
3935 3936 3937 3938
      if (pTable != NULL) { // in case of handling retrieve data from tsdb
        tsdbUnRefTable(pTable);
      }
      //assert(pTable != NULL);
3939 3940 3941 3942 3943
    }

    taosArrayDestroy(p);
  }

3944
  taosHashCleanup(pGroupList->map);
3945
  taosArrayDestroy(pGroupList->pGroupList);
H
Haojun Liao 已提交
3946
  pGroupList->numOfTables = 0;
3947
}
H
Haojun Liao 已提交
3948 3949 3950 3951 3952 3953 3954

static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  // Scan each node in the skiplist by using iterator
  while (tSkipListIterNext(iter)) {
    SSkipListNode *pNode = tSkipListIterGet(iter);
H
Haojun Liao 已提交
3955
    if (exprTreeApplyFilter(pExpr, pNode, param)) {
H
Haojun Liao 已提交
3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978
      taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
    }
  }

  tSkipListDestroyIter(iter);
}

typedef struct {
  char*    v;
  int32_t  optr;
} SEndPoint;

typedef struct {
  SEndPoint* start;
  SEndPoint* end;
} SQueryCond;

// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
  int32_t optr = queryColInfo->optr;

  if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
      optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3979
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3980
    pCond->start->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3981
    pCond->start->v    = queryColInfo->q;
H
Haojun Liao 已提交
3982
  } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3983
    pCond->end       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3984
    pCond->end->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3985 3986
    pCond->end->v    = queryColInfo->q;
  } else if (optr == TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
3987
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
Y
yihaoDeng 已提交
3988 3989 3990
    pCond->start->optr = queryColInfo->optr;
    pCond->start->v    = queryColInfo->q; 
  } else if (optr == TSDB_RELATION_LIKE) {
H
Haojun Liao 已提交
3991
    assert(0);
3992 3993
  } else if (optr == TSDB_RELATION_MATCH) {
    assert(0);
3994 3995
  } else if (optr == TSDB_RELATION_NMATCH) {
    assert(0);
H
Haojun Liao 已提交
3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078
  }

  return TSDB_CODE_SUCCESS;
}

static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
  SSkipListIterator* iter = NULL;

  SQueryCond cond = {0};
  if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
    //todo handle error
  }

  if (cond.start != NULL) {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
  } else {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
  }

  if (cond.start != NULL) {
    int32_t optr = cond.start->optr;

    if (optr == TSDB_RELATION_EQUAL) {   // equals
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
    } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
      bool comp = true;
      int32_t ret = 0;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
          assert(ret >= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_GREATER) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;
        }
      }
    } else if (optr == TSDB_RELATION_NOT_EQUAL) {   // not equal
      bool comp = true;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

      tSkipListDestroyIter(iter);

      comp = true;
      iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

Y
yihaoDeng 已提交
4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091
    } else if (optr == TSDB_RELATION_IN) {
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
      
H
Haojun Liao 已提交
4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132
    } else {
      assert(0);
    }
  } else {
    int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
    if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
      bool    comp = true;
      int32_t ret = 0;

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
          assert(ret <= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_LESS) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;  // no need to compare anymore
        }
      }
    } else {
      assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
        if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
            (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
        }
      }
    }
  }

wafwerar's avatar
wafwerar 已提交
4133 4134
  taosMemoryFree(cond.start);
  taosMemoryFree(cond.end);
H
Haojun Liao 已提交
4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152
  tSkipListDestroyIter(iter);
}

static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  while (tSkipListIterNext(iter)) {
    bool addToResult = false;

    SSkipListNode *pNode = tSkipListIterGet(iter);

    char *pData = SL_GET_NODE_DATA(pNode);
    tstr *name = (tstr*) tsdbGetTableName((void*) pData);

    // todo speed up by using hash
    if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
      if (pQueryInfo->optr == TSDB_RELATION_IN) {
        addToResult = pQueryInfo->compare(name, pQueryInfo->q);
4153 4154 4155
      } else if (pQueryInfo->optr == TSDB_RELATION_LIKE ||
                 pQueryInfo->optr == TSDB_RELATION_MATCH ||
                 pQueryInfo->optr == TSDB_RELATION_NMATCH) {
H
Haojun Liao 已提交
4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171
        addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
      }
    } else {
      addToResult = filterFp(pNode, pQueryInfo);
    }

    if (addToResult) {
      STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
      taosArrayPush(res, &info);
    }
  }

  tSkipListDestroyIter(iter);
}

// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
dengyihao's avatar
dengyihao 已提交
4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204
//void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
//  if (pExpr == NULL) {
//    return;
//  }
//
//  tExprNode *pLeft  = pExpr->_node.pLeft;
//  tExprNode *pRight = pExpr->_node.pRight;
//
//  // column project
//  if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
//    assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
//
//    param->setupInfoFn(pExpr, param->pExtInfo);
//
//    tQueryInfo *pQueryInfo = pExpr->_node.info;
//    if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
//                                && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
//                                && pQueryInfo->optr != TSDB_RELATION_IN)) {
//      queryIndexedColumn(pSkipList, pQueryInfo, result);
//    } else {
//      queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
//    }
//
//    return;
//  }
//
//  // The value of hasPK is always 0.
//  uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
//  assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//
//  //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
//  applyFilterToSkipListNode(pSkipList, pExpr, result, param);
//}
L
Liu Jicong 已提交
4205
#endif