tsdbRead.c 139.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnodeInt.h"
H
Haojun Liao 已提交
17
#include "tdatablock.h"
H
Haojun Liao 已提交
18 19 20 21 22
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "texception.h"
23

24
#include "taosdef.h"
25
#include "tlosertree.h"
H
Hongze Cheng 已提交
26
#include "vnodeInt.h"
27
#include "tmsg.h"
28

29
#define EXTRA_BYTES 2
30
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
31
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
32

H
Haojun Liao 已提交
33 34 35 36
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
37
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
38

H
hjxilinx 已提交
39
enum {
40 41
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
H
hjxilinx 已提交
42 43
};

44 45 46 47 48 49
enum {
  TSDB_CACHED_TYPE_NONE    = 0,
  TSDB_CACHED_TYPE_LASTROW = 1,
  TSDB_CACHED_TYPE_LAST    = 2,
};

50 51
typedef struct SQueryFilePos {
  int32_t fid;
52 53
  int32_t slot;
  int32_t pos;
54
  int64_t lastKey;
55 56
  int32_t rows;
  bool    mixBlock;
57
  bool    blockCompleted;
58
  STimeWindow win;
59
} SQueryFilePos;
H
hjxilinx 已提交
60

61
typedef struct SDataBlockLoadInfo {
H
Hongze Cheng 已提交
62
  SDFileSet*  fileGroup;
63
  int32_t     slot;
64
  uint64_t    uid;
65
  SArray*     pLoadedCols;
66
} SDataBlockLoadInfo;
H
hjxilinx 已提交
67

68
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
69
  int32_t tid; /* table tid */
70 71
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
72

73 74 75 76 77 78
enum {
  CHECKINFO_CHOSEN_MEM  = 0,
  CHECKINFO_CHOSEN_IMEM = 1,
  CHECKINFO_CHOSEN_BOTH = 2    //for update=2(merge case)
};

79
typedef struct STableCheckInfo {
80
  uint64_t      tableId;
H
Haojun Liao 已提交
81
  TSKEY         lastKey;
H
Haojun Liao 已提交
82
  SBlockInfo*   pCompInfo;
H
Haojun Liao 已提交
83
  int32_t       compSize;
84
  int32_t       numOfBlocks:29; // number of qualified data blocks not the original blocks
85
  uint8_t       chosen:2;       // indicate which iterator should move forward
H
Haojun Liao 已提交
86
  bool          initBuf:1;        // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
87 88
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
89
} STableCheckInfo;
90

91
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
92 93
  SBlock          *compBlock;
  STableCheckInfo *pTableCheckInfo;
94
} STableBlockInfo;
95

96 97
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
98
  STableBlockInfo**   pDataBlockInfo;
99
  int32_t*            blockIndexArray;
100
  int32_t*            numOfBlocksPerTable;
101 102
} SBlockOrderSupporter;

H
Haojun Liao 已提交
103 104 105
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
106
  int64_t checkForNextTime;
107 108
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
109 110
} SIOCostSummary;

111 112
typedef struct STsdbReadHandle {
  STsdb*     pTsdb;
H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
H
Haojun Liao 已提交
122
  SArray*        pTableCheckInfo;  // SArray<STableCheckInfo>
H
Haojun Liao 已提交
123 124
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
D
init  
dapan1121 已提交
125
  int8_t         cachelastrow;     // check if last row cached
126
  bool           loadExternalRow;  // load time window external data rows
H
Haojun Liao 已提交
127 128
  bool           currentLoadExternalRows; // current load external rows
  int32_t        loadType;         // block load type
H
Haojun Liao 已提交
129
  char          *idStr;            // query info handle, for debug purpose
H
Haojun Liao 已提交
130
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
H
Hongze Cheng 已提交
131 132 133
  SDFileSet*     pFileGroup;
  SFSIter        fileIter;
  SReadH         rhelper;
H
Haojun Liao 已提交
134
  STableBlockInfo* pDataBlockInfo;
H
Haojun Liao 已提交
135
  SDataCols     *pDataCols;        // in order to hold current file data block
H
Haojun Liao 已提交
136
  int32_t        allocSize;        // allocated data block size
H
Haojun Liao 已提交
137
  SArray        *defaultLoadColumn;// default load column
H
Haojun Liao 已提交
138
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
H
Haojun Liao 已提交
139
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
H
Haojun Liao 已提交
140

141 142
  SArray        *prev;             // previous row which is before than time window
  SArray        *next;             // next row which is after the query time window
H
Haojun Liao 已提交
143
  SIOCostSummary cost;
144
} STsdbReadHandle;
145

H
Haojun Liao 已提交
146 147 148
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
149
  SSchema*   pTagSchema;
H
Haojun Liao 已提交
150 151
} STableGroupSupporter;

152
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
153 154
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
155
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
156

H
Haojun Liao 已提交
157
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
158
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
159
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
160
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle);
161
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
162 163 164
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
//static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
//static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
H
Haojun Liao 已提交
165
static bool    tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
166

167
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
168
  pBlockLoadInfo->slot = -1;
169
  pBlockLoadInfo->uid  = 0;
H
hjxilinx 已提交
170
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
171 172
}

173
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
174
  pCompBlockLoadInfo->tid = -1;
175 176
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
177

178 179
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
180 181 182 183
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
184
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
185 186 187 188 189 190
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

191 192
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
193 194 195 196 197

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
198 199
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
200 201 202 203 204 205
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;

  int64_t rows = 0;
  STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable;
  if (pMemTable == NULL) { return rows; }

//  STableData* pMem  = NULL;
//  STableData* pIMem = NULL;

//  SMemTable* pMemT = pMemRef->snapshot.mem;
//  SMemTable* pIMemT = pMemRef->snapshot.imem;

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

//    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
//      pMem = pMemT->tData[pCheckInfo->tableId];
//      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
//    }
//    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
//      pIMem = pIMemT->tData[pCheckInfo->tableId];
//      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
//    }
  }
  return rows;
}
234

235 236 237
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
  assert(numOfGroup >= 1);
H
Haojun Liao 已提交
238 239 240 241 242 243 244 245

  // allocate buffer in order to load data blocks from file
  SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
246
  for (int32_t i = 0; i < numOfGroup; ++i) {
H
Haojun Liao 已提交
247 248 249 250 251 252 253 254
    SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);

    size_t gsize = taosArrayGetSize(group);
    assert(gsize > 0);

    for (int32_t j = 0; j < gsize; ++j) {
      STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);

255
      STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
256 257 258
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
        if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
          info.lastKey = pTsdbReadHandle->window.skey;
259 260
        }

261
        assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
262
      } else {
263
        assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
264 265 266
      }

      taosArrayPush(pTableCheckInfo, &info);
H
Haojun Liao 已提交
267
      tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
268 269 270
    }
  }

271
  // TODO  group table according to the tag value.
272
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
273 274 275
  return pTableCheckInfo;
}

276 277
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
278 279 280 281
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
282 283
    STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
284 285
    pCheckInfo->iter    = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter   = tSkipListDestroyIter(pCheckInfo->iiter);
286
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
287

288 289
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
290
    } else {
291
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
292
    }
H
Haojun Liao 已提交
293 294 295
  }
}

H
Haojun Liao 已提交
296 297 298
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
299

H
Haojun Liao 已提交
300
  STableCheckInfo info = { .lastKey = skey};
H
Haojun Liao 已提交
301

H
Haojun Liao 已提交
302 303
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
304 305 306
  return pNew;
}

307 308
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
309

310 311
  STimeWindow* w = &pTsdbReadHandle->window;
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
312 313 314 315

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

316 317
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
318
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
319 320 321
  STsdbCfg* pCfg = &pTsdb->config;

  int64_t now = taosGetTimestamp(pCfg->precision);
322
  return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1;  // needs to add one tick
323 324
}

325 326
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
  pTsdbReadHandle->window = pCond->twindow;
327

328
  bool    updateTs = false;
329 330 331 332
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
333 334
      pCond->twindow.skey = startTs;
      updateTs = true;
335 336
    }
  } else {
337 338
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
339 340
      pCond->twindow.ekey = startTs;
      updateTs = true;
341 342 343
    }
  }

344
  if (updateTs) {
H
Haojun Liao 已提交
345 346 347
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
              pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
              pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
348
  }
349 350
}

H
Haojun Liao 已提交
351
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
352
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
353
  if (pReadHandle == NULL) {
354
    goto _end;
355
  }
H
Haojun Liao 已提交
356

357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
  pReadHandle->order       = pCond->order;
  pReadHandle->pTsdb       = tsdb;
  pReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid     = INT32_MIN;
  pReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles  = true;
  pReadHandle->activeIndex = 0;   // current active table index
  pReadHandle->allocSize   = 0;
  pReadHandle->locateStart = false;
  pReadHandle->loadType    = pCond->type;

  pReadHandle->outputCapacity  = 4096;//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
372
  char buf[128] = {0};
H
Haojun Liao 已提交
373
  snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
H
Haojun Liao 已提交
374 375
  pReadHandle->idStr = strdup(buf);

376
  if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
377
    goto _end;
B
Bomin Zhang 已提交
378
  }
H
Haojun Liao 已提交
379

380 381
  assert(pCond != NULL);
  setQueryTimewindow(pReadHandle, pCond);
382

383 384
  if (pCond->numOfCols > 0) {
    // allocate buffer in order to load data blocks from file
wafwerar's avatar
wafwerar 已提交
385
    pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SDataStatis));
386
    if (pReadHandle->statis == NULL) {
387
      goto _end;
388
    }
H
Haojun Liao 已提交
389

390
    // todo: use list instead of array?
391 392
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
393
      goto _end;
394
    }
H
Haojun Liao 已提交
395

396 397 398
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400
      int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
401
      if (code != TSDB_CODE_SUCCESS) {
402
        goto _end;
403
      }
404

405 406
      taosArrayPush(pReadHandle->pColumns, &colInfo);
      pReadHandle->statis[i].colId = colInfo.info.colId;
B
Bomin Zhang 已提交
407
    }
H
Haojun Liao 已提交
408

409
    pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
H
Haojun Liao 已提交
410
  }
411

412 413
  pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
414
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
415
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
416
    goto _end;
H
hjxilinx 已提交
417
  }
418

419 420
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
421

H
Haojun Liao 已提交
422
  return (tsdbReaderT)pReadHandle;
423

424
  _end:
425
  tsdbCleanupReadHandle(pReadHandle);
426
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
427
  return NULL;
H
hjxilinx 已提交
428 429
}

H
Haojun Liao 已提交
430
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
431
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
432
  if (pTsdbReadHandle == NULL) {
433 434 435
    return NULL;
  }

436
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
H
Haojun Liao 已提交
437
    return (tsdbReaderT*) pTsdbReadHandle;
438
  }
H
Haojun Liao 已提交
439 440

  // todo apply the lastkey of table check to avoid to load header file
441
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
442
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
443
//    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
444 445 446 447
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

448 449 450
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo),
      taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr);

H
Haojun Liao 已提交
451
  return (tsdbReaderT) pTsdbReadHandle;
H
Haojun Liao 已提交
452 453
}

H
Haojun Liao 已提交
454
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) {
455
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
456

457 458 459
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
dengyihao's avatar
dengyihao 已提交
460
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, int64_t);
461 462 463 464 465
    }

    return;
  }

466 467 468 469 470 471 472 473 474
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
475 476

  if (ASCENDING_TRAVERSE(pCond->order)) {
477
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
478
  } else {
479
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
480 481 482
  }

  // allocate buffer in order to load data blocks from file
483
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
484

485 486
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
487

488
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
489 490
}

H
Haojun Liao 已提交
491
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
492
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
493

494 495 496 497 498 499 500 501 502
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
503 504

  if (ASCENDING_TRAVERSE(pCond->order)) {
505
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
506
  } else {
507
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
508 509 510
  }

  // allocate buffer in order to load data blocks from file
511
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
512

513 514
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
515

H
Haojun Liao 已提交
516
  SArray* pTable = NULL;
517
//  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
518

519
//  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
520

521 522
  pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
523
//    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
524 525
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
526

527 528
//  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
529 530
}

H
Haojun Liao 已提交
531
tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
532
  pCond->twindow = updateLastrowForEachGroup(groupList);
H
Haojun Liao 已提交
533 534 535 536 537 538

  // no qualified table
  if (groupList->numOfTables == 0) {
    return NULL;
  }

H
Haojun Liao 已提交
539
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
540
  if (pTsdbReadHandle == NULL) {
541 542 543
    return NULL;
  }

544
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList);
H
Haojun Liao 已提交
545 546 547 548
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
549 550

  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
551 552
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
553 554
  }
  
555
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
556 557
}

558
#if 0
H
Haojun Liao 已提交
559
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
560 561
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
  if (pTsdbReadHandle == NULL) {
562 563 564
    return NULL;
  }

565
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
566 567 568 569 570
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

571 572
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
573
  }
D
init  
dapan1121 已提交
574
  
575
  return pTsdbReadHandle;
H
hjxilinx 已提交
576 577
}

578
#endif
H
Haojun Liao 已提交
579
SArray* tsdbGetQueriedTableList(tsdbReaderT *pHandle) {
580
  assert(pHandle != NULL);
H
Haojun Liao 已提交
581

582
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
583

584
  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
585
  SArray* res = taosArrayInit(size, POINTER_BYTES);
586 587 588
  return res;
}

H
Haojun Liao 已提交
589 590 591 592 593
// leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
  assert(pGroupList);
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

wafwerar's avatar
wafwerar 已提交
594
  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
Y
yihaoDeng 已提交
595
  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
H
Haojun Liao 已提交
596 597 598 599 600 601 602 603

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
    size_t numOfTables = taosArrayGetSize(oneGroup);

    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
    for (int32_t j = 0; j < numOfTables; ++j) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
604 605 606 607 608
//      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//        taosArrayPush(px, pInfo);
//        pNew->numOfTables += 1;
//        break;
//      }
H
Haojun Liao 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621
    }

    // there are no data in this group
    if (taosArrayGetSize(px) == 0) {
      taosArrayDestroy(px);
    } else {
      taosArrayPush(pNew->pGroupList, &px);
    }
  }

  return pNew;
}

H
Haojun Liao 已提交
622
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
623 624
  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);

625 626 627 628 629 630 631 632 633 634 635 636
  if (pNew->numOfTables == 0) {
    tsdbDebug("update query time range to invalidate time window");

    assert(taosArrayGetSize(pNew->pGroupList) == 0);
    bool asc = ASCENDING_TRAVERSE(pCond->order);
    if (asc) {
      pCond->twindow.ekey = pCond->twindow.skey - 1;
    } else {
      pCond->twindow.skey = pCond->twindow.ekey - 1;
    }
  }

H
Haojun Liao 已提交
637
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
638 639
  pTsdbReadHandle->loadExternalRow = true;
  pTsdbReadHandle->currentLoadExternalRows = true;
640

641
  return pTsdbReadHandle;
642 643
}

644
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
645
  if (pCheckInfo->initBuf) {
646 647
    return true;
  }
H
Haojun Liao 已提交
648

649
  pCheckInfo->initBuf = true;
650
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
651

652 653 654
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

H
Haojun Liao 已提交
655
  TSKEY tLastKey = 0;  /// keyToTkey(pCheckInfo->lastKey);
656 657 658
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
659
      pCheckInfo->iter =
660
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
661
    }
662
  }
H
Haojun Liao 已提交
663

664 665 666
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
667
      pCheckInfo->iiter =
668
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
669
    }
670
  }
H
Haojun Liao 已提交
671

672 673 674 675
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
676

677 678 679 680 681
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
H
Haojun Liao 已提交
682

683 684 685
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
686

H
Haojun Liao 已提交
687 688
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
689
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
690 691
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
692 693 694 695 696 697 698

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

699
  } else {
H
Haojun Liao 已提交
700
    tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
701
  }
H
Haojun Liao 已提交
702

703 704 705
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
706

H
Haojun Liao 已提交
707 708
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
709
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
710 711
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
712 713 714 715 716 717

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
718
  } else {
H
Haojun Liao 已提交
719
    tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
720
  }
H
Haojun Liao 已提交
721

722 723 724
  return true;
}

H
Haojun Liao 已提交
725 726 727 728 729
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

730
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
H
Haojun Liao 已提交
731
  STSRow *rmem = NULL, *rimem = NULL;
732 733 734
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
735
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
736 737 738 739 740 741
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
742
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
743 744 745 746 747 748 749 750 751
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
752
    return TD_ROW_KEY(rmem);
753 754 755 756
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
757
    return TD_ROW_KEY(rimem);
758 759
  }

H
Haojun Liao 已提交
760 761
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784

  if (r1 == r2) {
    if(update == TD_ROW_DISCARD_UPDATE){
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
    }
    else if(update == TD_ROW_OVERWRITE_UPDATE) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
  }
  else {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

H
Haojun Liao 已提交
785 786
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) {
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
787 788 789
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
790
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
791 792
    }
  }
793

H
Haojun Liao 已提交
794 795 796
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
797
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
798 799
    }
  }
800

H
Haojun Liao 已提交
801 802
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
803
  }
804

H
Haojun Liao 已提交
805
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
806 807 808
    pCheckInfo->chosen = 0;
    return rmem;
  }
809

H
Haojun Liao 已提交
810
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
811 812 813
    pCheckInfo->chosen = 1;
    return rimem;
  }
814

H
Haojun Liao 已提交
815 816
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
817

818 819
  if (r1 == r2) {
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
820
      tSkipListIterNext(pCheckInfo->iter);
821
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
822
      return rimem;
823
    } else if(update == TD_ROW_OVERWRITE_UPDATE){
H
TD-1439  
Hongze Cheng 已提交
824
      tSkipListIterNext(pCheckInfo->iiter);
825 826 827 828
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
829
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
830 831
      return rmem;
    }
H
Haojun Liao 已提交
832 833 834
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
835
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
836 837
        return rmem;
      } else {
838
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
839 840 841 842
        return rimem;
      }
    } else {
      if (r1 < r2) {
843
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
844 845
        return rimem;
      } else {
846
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
847 848 849 850
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
851 852
}

853
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
854
  bool hasNext = false;
855
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
856 857 858
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
859

H
Haojun Liao 已提交
860 861 862
    if (hasNext) {
      return hasNext;
    }
863

H
Haojun Liao 已提交
864 865 866
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
867
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){
868 869 870
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
871

872 873 874
    if (hasNext) {
      return hasNext;
    }
875

876 877
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
878
    }
879 880 881 882 883 884 885
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
886
  }
887

H
Haojun Liao 已提交
888 889 890
  return hasNext;
}

891
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
TD-1439  
Hongze Cheng 已提交
892
  STsdbCfg *pCfg = &pHandle->pTsdb->config;
893 894
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
895
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
896

897
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
898 899 900 901
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

H
Haojun Liao 已提交
902
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
903
  if (row == NULL) {
904 905
    return false;
  }
906

H
Haojun Liao 已提交
907
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
H
Haojun Liao 已提交
908 909
  tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
      pCheckInfo->tableId,  pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
910

911
  // all data in mem are checked already.
912 913
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
914 915
    return false;
  }
H
Haojun Liao 已提交
916

917 918
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
919
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
920

921 922 923 924
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
925

926
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
dengyihao's avatar
dengyihao 已提交
927
    TSWAP(win->skey, win->ekey, TSKEY);
928
  }
H
Haojun Liao 已提交
929

930
  return true;
931
}
H
hjxilinx 已提交
932

933 934
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
935 936 937
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
938

D
dapan1121 已提交
939
  if (key < 0) {
940
    key -= (daysPerFile * tsTickPerDay[precision]);
D
dapan1121 已提交
941 942
  }
  
943
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision]));  // set the starting fileId
944 945 946
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
947

948
  if (fid > 0L && fid > INT32_MAX) {
949 950
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
951

S
TD-1057  
Shengliang Guan 已提交
952
  return (int32_t)fid;
953 954
}

H
refact  
Hongze Cheng 已提交
955
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
956 957
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
958

959
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
960

961 962 963
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
964

965
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
966

967 968 969 970 971 972 973 974 975 976 977
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
978

979 980
  return midSlot;
}
981

982
static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
983
  int32_t code = 0;
H
Haojun Liao 已提交
984

985
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
986
  pCheckInfo->numOfBlocks = 0;
987

H
Haojun Liao 已提交
988 989 990 991
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
  table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
992 993 994
    code = terrno;
    return code;
  }
995

996
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
997

H
Haojun Liao 已提交
998
  // no data block in this file, try next file
999
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1000 1001
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1002

H
Haojun Liao 已提交
1003 1004 1005
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1006
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1007 1008 1009 1010
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1011 1012
    }

H
Haojun Liao 已提交
1013 1014 1015
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1016

1017
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1018 1019
    return terrno;
  }
H
Haojun Liao 已提交
1020
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1021

H
Haojun Liao 已提交
1022
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1023

1024 1025
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1026
  } else {
1027
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1028
  }
1029

dengyihao's avatar
dengyihao 已提交
1030 1031
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1032

H
Haojun Liao 已提交
1033 1034 1035
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1036

H
Haojun Liao 已提交
1037 1038 1039
  if (s > pCompInfo->blocks[start].keyLast) {
    return 0;
  }
1040

H
Haojun Liao 已提交
1041 1042 1043 1044
  // todo speedup the procedure of located end block
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
    end += 1;
  }
1045

H
Haojun Liao 已提交
1046
  pCheckInfo->numOfBlocks = (end - start);
1047

H
Haojun Liao 已提交
1048 1049 1050
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1051

H
Haojun Liao 已提交
1052 1053 1054
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1055

1056
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1057 1058 1059 1060
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1061
  pTsdbReadHandle->cost.headFileLoad += 1;
1062 1063
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1064
  size_t numOfTables = 0;
1065 1066 1067 1068
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1069

H
Haojun Liao 已提交
1070
    for (int32_t i = 0; i < numOfTables; ++i) {
1071
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1072
      if (code != TSDB_CODE_SUCCESS) {
1073 1074
        int64_t e = taosGetTimestampUs();

1075
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1076 1077 1078 1079 1080
        return code;
      }
    }
  } else {
    assert(0);
1081
  }
1082

1083
  int64_t e = taosGetTimestampUs();
1084
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1085
  return code;
1086 1087
}

1088
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
H
Haojun Liao 已提交
1089
  int64_t st = taosGetTimestampUs();
1090

H
Haojun Liao 已提交
1091
  STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
1092
  int32_t   code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
H
Haojun Liao 已提交
1093
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1094
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1095 1096 1097 1098
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1099
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema);
H
Haojun Liao 已提交
1100
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1101
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1102 1103 1104 1105
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1106
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema);
H
Haojun Liao 已提交
1107
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1108
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1109 1110 1111
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1112

1113
  int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
H
Haojun Liao 已提交
1114

C
Cary Xu 已提交
1115
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1116
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1117 1118 1119
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1120
  }
1121

1122
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1123

1124 1125
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1126
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1127

1128
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1129
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1130

1131
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1132

1133
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
1134
  if(pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1135 1136 1137 1138 1139 1140
    int64_t* src = pCols->cols[0].pData;
    for(int32_t i = 0; i < pBlock->numOfRows; ++i) {
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1141
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1142
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1143

H
Haojun Liao 已提交
1144 1145
  tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s",
      pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1146
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1147 1148 1149 1150

_error:
  pBlock->numOfRows = 0;

H
Haojun Liao 已提交
1151 1152
  tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s",
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1153
  return terrno;
H
hjxilinx 已提交
1154 1155
}

1156 1157 1158 1159 1160
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
1161

1162 1163 1164
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1165
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1166
  TSKEY          key;
H
Haojun Liao 已提交
1167
  int32_t code = TSDB_CODE_SUCCESS;
1168

1169
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1170 1171
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

1172
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
1173

H
Haojun Liao 已提交
1174
  if (key != TSKEY_INITIAL_VAL) {
H
Haojun Liao 已提交
1175
    tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1176
  } else {
H
Haojun Liao 已提交
1177
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1178
  }
H
Haojun Liao 已提交
1179

1180 1181
  if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
1182

1183 1184
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
1185

H
Haojun Liao 已提交
1186
      // do not load file block into buffer
1187
      int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
1188

1189 1190 1191
      TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step);
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1192 1193 1194

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1195
      if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1196
        TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
H
Haojun Liao 已提交
1197
      }
H
Haojun Liao 已提交
1198

H
Haojun Liao 已提交
1199 1200
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1201
      return code;
H
Haojun Liao 已提交
1202
    }
H
Haojun Liao 已提交
1203

1204

1205
    // return error, add test cases
1206
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1207
      return code;
1208 1209
    }

1210
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1211
  } else {
1212 1213 1214 1215 1216 1217
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1218 1219
    assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1220

1221 1222 1223
    if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1224 1225 1226 1227

      cur->rows = binfo.rows;
      cur->win  = binfo.window;
      cur->mixBlock = false;
H
Haojun Liao 已提交
1228 1229
      cur->blockCompleted = true;

1230
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
H
Haojun Liao 已提交
1231 1232 1233 1234 1235 1236
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
H
Haojun Liao 已提交
1237
    } else { // partially copy to dest buffer
1238
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1239 1240
      cur->mixBlock = true;
    }
1241

H
Haojun Liao 已提交
1242
    assert(cur->blockCompleted);
H
Haojun Liao 已提交
1243
    if (cur->rows == binfo.rows) {
H
Haojun Liao 已提交
1244 1245
      tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1246
    } else {
H
Haojun Liao 已提交
1247 1248
      tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1249 1250
    }

1251
  }
H
Haojun Liao 已提交
1252 1253

  return code;
1254 1255
}

1256 1257
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1258
  int32_t code = TSDB_CODE_SUCCESS;
1259
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1260

1261
  if (asc) {
H
Haojun Liao 已提交
1262
    // query ended in/started from current block
1263 1264
    if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1265 1266
        *exists = false;
        return code;
1267
      }
1268

1269
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1270
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1271

1272 1273
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
1274
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1275 1276 1277
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1278

H
Haojun Liao 已提交
1279
      assert(pCheckInfo->lastKey <= pBlock->keyLast);
1280
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1281
    } else {  // the whole block is loaded in to buffer
1282
      cur->pos = asc? 0:(pBlock->numOfRows - 1);
1283
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1284
    }
1285
  } else {  //desc order, query ended in current block
1286 1287
    if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1288 1289
        *exists = false;
        return code;
1290
      }
H
Haojun Liao 已提交
1291

1292
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
1293
      if (pCheckInfo->lastKey < pBlock->keyLast) {
1294
        cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1295
      } else {
H
Haojun Liao 已提交
1296
        cur->pos = pBlock->numOfRows - 1;
1297
      }
H
Haojun Liao 已提交
1298

H
Haojun Liao 已提交
1299
      assert(pCheckInfo->lastKey >= pBlock->keyFirst);
1300
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1301
    } else {
1302
      cur->pos = asc? 0:(pBlock->numOfRows-1);
1303
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1304
    }
1305
  }
1306

1307
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1308
  return code;
H
[td-32]  
hjxilinx 已提交
1309 1310
}

1311
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1312
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1313
  int    numOfRows;
1314 1315
  TSKEY* keyList;

1316
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1317

1318
  if (num <= 0) return -1;
1319 1320

  keyList = (TSKEY*)pValue;
1321 1322
  firstPos = 0;
  lastPos = num - 1;
1323

1324
  if (order == TSDB_ORDER_DESC) {
1325 1326 1327 1328 1329
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1330

H
Haojun Liao 已提交
1331 1332
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1333

1334 1335 1336 1337 1338 1339 1340 1341
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1342

1343 1344 1345 1346 1347
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1348

1349 1350 1351 1352 1353 1354 1355
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1356

H
Haojun Liao 已提交
1357 1358
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1359

1360 1361 1362 1363 1364 1365 1366 1367 1368
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1369

1370 1371 1372
  return midPos;
}

1373 1374
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
1375

1376
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1377
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1378

1379
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1380 1381 1382 1383 1384 1385
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1386
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1387

1388
  //data in buffer has greater timestamp, copy data in file block
1389 1390
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
1391
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1392 1393 1394 1395 1396 1397 1398

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1399
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1400 1401 1402 1403
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
//        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
        for(int32_t k = start; k < num + start; ++k) {
          SCellVal sVal = {0};
C
Cary Xu 已提交
1404
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1405 1406
            TASSERT(0);
          }
1407

1408 1409 1410 1411 1412 1413 1414
          if (sVal.valType == TD_VTYPE_NULL) {
            colDataAppend(pColInfo, k, NULL, true);
          } else {
            colDataAppend(pColInfo, k, sVal.val, false);
          }
        }
      } else {  // handle the var-string
1415 1416
        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
1417
          SCellVal sVal = {0};
C
Cary Xu 已提交
1418
          if(tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0){
H
Haojun Liao 已提交
1419 1420
            TASSERT(0);
          }
1421 1422

          colDataAppend(pColInfo, k, sVal.val, false);
1423 1424
        }
      }
1425 1426 1427 1428

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
1429 1430
      for(int32_t k = start; k < num + start; ++k) {  // TODO opt performance
        colDataAppend(pColInfo, k, NULL, true);
1431 1432
      }
      i++;
1433 1434
    }
  }
1435 1436

  while (i < requiredNumOfCols) { // the remain columns are all null data
1437
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1438 1439
    for(int32_t k = start; k < num + start; ++k) {
      colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
1440 1441
    }
    i++;
1442
  }
H
Haojun Liao 已提交
1443

1444 1445
  pTsdbReadHandle->cur.win.ekey = tsArray[end];
  pTsdbReadHandle->cur.lastKey = tsArray[end] + step;
1446

1447
  return numOfRows + num;
1448 1449
}

H
Haojun Liao 已提交
1450
// TODO fix bug for reverse copy data problem
1451
// Note: row1 always has high priority
H
Haojun Liao 已提交
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
                               STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                               bool forceSetNull) {
#if 1
  STSchema*   pSchema;
  STSRow*     row;
  int16_t     colId;
  int16_t     offset;

  bool isRow1DataRow = TD_IS_TP_ROW(row1);
1462 1463 1464
  bool isRow2DataRow;
  bool isChosenRowDataRow;
  int32_t chosen_itr;
H
Haojun Liao 已提交
1465
  SCellVal sVal = {0};
1466

H
Haojun Liao 已提交
1467
  // the schema version info is embeded in STSRow
1468 1469 1470
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
H
Haojun Liao 已提交
1471
    pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1));
1472
  }
1473

1474 1475
  if(isRow1DataRow) {
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1476
  } else {
H
Haojun Liao 已提交
1477
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1478
  }
1479

1480 1481
  int32_t numOfColsOfRow2 = 0;
  if(row2) {
H
Haojun Liao 已提交
1482
    isRow2DataRow = TD_IS_TP_ROW(row2);
1483
    if (pSchema2 == NULL) {
H
Haojun Liao 已提交
1484
      pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2));
1485 1486 1487 1488
    }
    if(isRow2DataRow) {
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1489
      numOfColsOfRow2 = tdRowGetNCols(row2);
1490 1491
    }
  }
C
Cary Xu 已提交
1492

1493 1494
  int32_t i = 0, j = 0, k = 0;
  while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1495
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1496 1497 1498 1499 1500 1501 1502

    int32_t colIdOfRow1;
    if(j >= numOfColsOfRow1) {
      colIdOfRow1 = INT32_MAX;
    } else if(isRow1DataRow) {
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
H
Haojun Liao 已提交
1503
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row1, j);
1504 1505 1506 1507 1508 1509 1510 1511 1512
      colIdOfRow1 = pColIdx->colId;
    }

    int32_t colIdOfRow2;
    if(k >= numOfColsOfRow2) {
      colIdOfRow2 = INT32_MAX;
    } else if(isRow2DataRow) {
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
H
Haojun Liao 已提交
1513
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row2, k);
1514 1515 1516 1517 1518
      colIdOfRow2 = pColIdx->colId;
    }

    if(colIdOfRow1 == colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1519
        j++;
1520
        k++;
C
Cary Xu 已提交
1521 1522
        continue;
      }
1523 1524 1525 1526 1527 1528 1529 1530
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else if(colIdOfRow1 < colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
        j++;
        continue;
C
Cary Xu 已提交
1531
      }
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
      if(colIdOfRow2 < pColInfo->info.colId) {
        k++;
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
    if(isChosenRowDataRow) {
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
H
Haojun Liao 已提交
1549
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal);
1550
    } else {
H
Haojun Liao 已提交
1551
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row, chosen_itr);
1552 1553
      colId = pColIdx->colId;
      offset = pColIdx->offset;
H
Haojun Liao 已提交
1554
      tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal);
1555 1556 1557
    }

    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1558
      if (tdValTypeIsNorm(sVal.valType)) {
H
Haojun Liao 已提交
1559
        colDataAppend(pColInfo, numOfRows, sVal.val, false);
H
Haojun Liao 已提交
1560
      } else if (forceSetNull) {
H
Haojun Liao 已提交
1561
        colDataAppend(pColInfo, numOfRows, NULL, true);
1562
      }
H
Haojun Liao 已提交
1563

1564
      i++;
C
Cary Xu 已提交
1565

1566
      if(row == row1) {
C
Cary Xu 已提交
1567
        j++;
1568 1569 1570 1571 1572
      } else {
        k++;
      }
    } else {
      if(forceSetNull) {
H
Haojun Liao 已提交
1573
        colDataAppend(pColInfo, numOfRows, NULL, true);
C
Cary Xu 已提交
1574
      }
1575
      i++;
1576
    }
1577
  }
1578

1579 1580
  if(forceSetNull) {
    while (i < numOfCols) { // the remain columns are all null data
1581
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
1582
      colDataAppend(pColInfo, numOfRows, NULL, true);
1583
      i++;
1584 1585
    }
  }
H
Haojun Liao 已提交
1586
#endif
1587
}
1588

1589 1590
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
  if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1591 1592 1593 1594
    return;
  }

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1595 1596
  if (numOfRows < pTsdbReadHandle->outputCapacity) {
    int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
1597
    for(int32_t i = 0; i < numOfCols; ++i) {
1598
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
1599
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
1600 1601 1602 1603
    }
  }
}

1604
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
1605
                                int32_t* start, int32_t* end) {
1606 1607
  *start = -1;

1608
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1609
    int32_t remain = endPos - startPos + 1;
1610 1611
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1612 1613
    } else {
      *end = endPos;
1614 1615 1616 1617 1618
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1619 1620
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1621 1622
    } else {
      *end = endPos;
1623 1624 1625 1626 1627 1628 1629
    }

    *start = *end;
    *end = startPos;
  }
}

1630 1631
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1632 1633

  pCheckInfo->lastKey = cur->lastKey;
1634
  pTsdbReadHandle->realNumOfRows = numOfRows;
1635 1636 1637 1638
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1639 1640
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1641 1642

  if (cur->rows > 0) {
1643 1644
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1645
    } else {
1646
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1647 1648
    }

1649
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
H
Haojun Liao 已提交
1650 1651
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
  } else {
1652
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1653

1654 1655
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1656 1657 1658
  }
}

1659 1660
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1661

1662
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1663 1664
  TSKEY* tsArray = pCols->cols[0].pData;

1665 1666
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
H
Haojun Liao 已提交
1667 1668 1669 1670 1671 1672

  int32_t pos = cur->pos;

  int32_t start = cur->pos;
  int32_t end = endPos;

1673
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1674
    TSWAP(start, end, int32_t);
H
Haojun Liao 已提交
1675 1676
  }

1677 1678
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1679 1680 1681

  // the time window should always be ascending order: skey <= ekey
  cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1682
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
H
Haojun Liao 已提交
1683
  cur->lastKey = tsArray[endPos] + step;
H
Haojun Liao 已提交
1684
  cur->blockCompleted = true;
H
Haojun Liao 已提交
1685 1686

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1687
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
H
Haojun Liao 已提交
1688 1689 1690

  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
  pos = endPos + step;
1691 1692
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1693

H
Haojun Liao 已提交
1694 1695
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1696 1697
}

1698
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1699 1700
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1701
  int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1702

1703 1704
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1705

1706
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
H
Haojun Liao 已提交
1707 1708
    endPos = pBlockInfo->rows - 1;
    cur->mixBlock = (cur->pos != 0);
1709
  } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
H
Haojun Liao 已提交
1710 1711 1712 1713
    endPos = 0;
    cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
  } else {
    assert(pCols->numOfRows > 0);
1714
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
H
Haojun Liao 已提交
1715 1716 1717 1718 1719 1720
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1721 1722
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1723 1724 1725 1726
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataBlockInfo blockInfo = {0};//GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1727

1728
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1729

1730 1731
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
H
Haojun Liao 已提交
1732 1733
      cur->pos >= 0 && cur->pos < pBlock->numOfRows);

1734
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1735
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
1736 1737

  // for search the endPos, so the order needs to reverse
1738
  int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
1739

1740 1741
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
1742

H
Haojun Liao 已提交
1743
  STable* pTable = NULL;
1744
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1745

1746
  tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
H
Haojun Liao 已提交
1747
            "end:%d, %s",
1748
            pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey,
H
Haojun Liao 已提交
1749
            blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1750

1751 1752
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
H
Haojun Liao 已提交
1753

1754 1755 1756 1757
  int16_t rv1 = -1;
  int16_t rv2 = -1;
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1758

H
Haojun Liao 已提交
1759 1760
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
1761

1762 1763
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
1764
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
1765
    return;
1766
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1767 1768
    SSkipListNode* node = NULL;
    do {
H
Haojun Liao 已提交
1769 1770
      STSRow* row2 = NULL;
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
1771
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
1772
        break;
1773
      }
1774

H
Haojun Liao 已提交
1775
      TSKEY key = TD_ROW_KEY(row1);
1776 1777
      if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1778 1779 1780
        break;
      }

1781 1782
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1783 1784 1785
        break;
      }

1786 1787
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
1788
        if (rv1 != TD_ROW_SVER(row1)) {
1789
//          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1790
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
1791
        }
H
Haojun Liao 已提交
1792
        if(row2 && rv2 != TD_ROW_SVER(row2)) {
1793
//          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1794
          rv2 = TD_ROW_SVER(row2);
1795 1796
        }
        
H
Haojun Liao 已提交
1797
        mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
1798 1799 1800 1801
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1802

1803
        cur->win.ekey = key;
1804 1805 1806
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1807
        moveToNextRowInMem(pCheckInfo);
1808
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
H
TD-1439  
Hongze Cheng 已提交
1809
        if (pCfg->update) {
1810
          if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
1811
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
1812
          }
H
Haojun Liao 已提交
1813
          if (rv1 != TD_ROW_SVER(row1)) {
1814
//            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1815
            rv1 = TD_ROW_SVER(row1);
1816
          }
H
Haojun Liao 已提交
1817
          if(row2 && rv2 != TD_ROW_SVER(row2)) {
1818
//            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1819
            rv2 = TD_ROW_SVER(row2);
1820 1821 1822
          }
          
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
H
Haojun Liao 已提交
1823
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
H
TD-1439  
Hongze Cheng 已提交
1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
1838 1839
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1840 1841 1842
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1843

1844
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
1845 1846
        assert(end != -1);

H
Haojun Liao 已提交
1847
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
1848
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
1849 1850 1851 1852
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
1853
        }
1854

1855
        int32_t qstart = 0, qend = 0;
1856
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
1857

1858
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
1859 1860
        pos += (qend - qstart + 1) * step;

1861
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[qend]:tsArray[qstart];
1862
        cur->lastKey  = cur->win.ekey + step;
1863
      }
1864
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
1865

1866
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
1867 1868 1869 1870
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1871
      if (node == NULL ||
H
Haojun Liao 已提交
1872
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
1873
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
H
Haojun Liao 已提交
1874
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
1875
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1876 1877 1878 1879 1880
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

1881
        int32_t start = -1, end = -1;
1882
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
1883

1884
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
1885
        pos += (end - start + 1) * step;
1886

1887
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[end]:tsArray[start];
1888
        cur->lastKey  = cur->win.ekey + step;
H
Haojun Liao 已提交
1889
        cur->mixBlock = true;
1890
      }
1891 1892
    }
  }
H
Haojun Liao 已提交
1893 1894

  cur->blockCompleted =
1895 1896
      (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
       ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
1897

1898
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1899
    TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
1900
  }
1901

1902 1903 1904
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1905

H
Haojun Liao 已提交
1906 1907
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
      pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
1908 1909
}

1910
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
1911
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1912
  int    numOfRows;
1913 1914
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
1915
  if (num <= 0) return -1;
1916 1917

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
1918 1919
  firstPos = 0;
  lastPos = num - 1;
1920

1921
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
1922 1923 1924 1925 1926
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1927

H
Haojun Liao 已提交
1928 1929
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1930

H
[td-32]  
hjxilinx 已提交
1931 1932 1933 1934 1935 1936 1937 1938
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1939

H
[td-32]  
hjxilinx 已提交
1940 1941 1942 1943 1944
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1945

H
[td-32]  
hjxilinx 已提交
1946 1947 1948 1949 1950 1951 1952
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1953

H
Haojun Liao 已提交
1954 1955
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1956

H
[td-32]  
hjxilinx 已提交
1957 1958 1959 1960 1961 1962 1963 1964 1965
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1966

H
[td-32]  
hjxilinx 已提交
1967 1968 1969
  return midPos;
}

1970
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
1971 1972
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
1973 1974

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
1975
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
1976
    taosMemoryFreeClear(pBlockInfo);
1977 1978
  }

wafwerar's avatar
wafwerar 已提交
1979
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

1991
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
1992 1993
    /* left block is empty */
    return 1;
1994
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
1995 1996 1997 1998 1999 2000 2001
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2002
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
2003
#if 0	// TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2004 2005
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2006
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2007
  }
H
Haojun Liao 已提交
2008
#endif
2009

H
Haojun Liao 已提交
2010
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2011 2012
}

2013
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2014 2015
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2016 2017
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2018
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2019 2020 2021 2022
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

2023
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
2024 2025
  }

2026
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2027 2028
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2029
  // access data blocks according to the offset of each block in asc/desc order.
2030
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2031

2032 2033
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2034 2035 2036
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2037

2038
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2039
    cleanBlockOrderSupporter(&sup, 0);
2040
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2041
  }
H
Haojun Liao 已提交
2042

2043
  int32_t cnt = 0;
2044
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2045

2046
  for (int32_t j = 0; j < numOfTables; ++j) {
2047
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2048 2049 2050
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2051

H
refact  
Hongze Cheng 已提交
2052
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2053
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2054

wafwerar's avatar
wafwerar 已提交
2055
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2056
    if (buf == NULL) {
2057
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2058
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2059 2060
    }

2061
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2062 2063

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2064
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2065

H
Haojun Liao 已提交
2066 2067
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2068 2069 2070
      cnt++;
    }

2071
    numOfQualTables++;
2072 2073
  }

H
Haojun Liao 已提交
2074
  assert(numOfBlocks == cnt);
2075

H
Haojun Liao 已提交
2076 2077
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2078
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2079
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2080

H
Haojun Liao 已提交
2081 2082
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
        pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2083 2084
    return TSDB_CODE_SUCCESS;
  }
2085

H
Haojun Liao 已提交
2086 2087
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
      numOfQualTables, pTsdbReadHandle->idStr);
2088

2089
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2090
  sup.numOfTables = numOfQualTables;
2091

2092 2093
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2094 2095
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2096
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2097 2098 2099 2100 2101
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2102
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2103 2104
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2105
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2106
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2107 2108

    // set data block index overflow, in order to disable the offset comparator
2109 2110
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2111
    }
2112

H
Haojun Liao 已提交
2113
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2114 2115 2116 2117 2118
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2119
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2120 2121 2122
   * }
   */

H
Haojun Liao 已提交
2123
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2124
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2125
  taosMemoryFree(pTree);
2126 2127 2128 2129

  return TSDB_CODE_SUCCESS;
}

2130
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2131

2132 2133 2134
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool *exists) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2135 2136

  while(1) {
2137
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2138 2139 2140 2141
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2142 2143
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2144
      // all data blocks in current file has been checked already, try next file if exists
2145
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2146 2147 2148 2149
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2150
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2151 2152 2153 2154
    }
  }
}

2155 2156 2157
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2158 2159 2160

  int32_t code = TSDB_CODE_SUCCESS;

2161
  int32_t numOfBlocks = 0;
2162
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2163

2164
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
2165 2166
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Hongze Cheng 已提交
2167
  while (true) {
2168
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2169

2170 2171
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2172 2173 2174
      break;
    }

2175
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2176 2177

    // current file are not overlapped with query time window, ignore remain files
2178 2179 2180
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2181 2182
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2183 2184
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2185 2186 2187
      break;
    }

2188 2189
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2190 2191 2192 2193
      code = terrno;
      break;
    }

2194
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2195

2196
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2197 2198 2199 2200
      code = terrno;
      break;
    }

2201
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2202 2203
      break;
    }
H
Haojun Liao 已提交
2204

H
Haojun Liao 已提交
2205 2206
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2207

2208 2209 2210 2211
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2212

2213
    // todo return error code to query engine
2214
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) {
2215 2216
      break;
    }
H
Haojun Liao 已提交
2217

2218 2219
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2220 2221 2222
      break;
    }
  }
H
Haojun Liao 已提交
2223

2224
  // no data in file anymore
2225
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2226
    if (code == TSDB_CODE_SUCCESS) {
2227
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2228 2229
    }

D
dapan1121 已提交
2230
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2231 2232
    *exists = false;
    return code;
2233
  }
H
Haojun Liao 已提交
2234

2235 2236 2237
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 0:pTsdbReadHandle->numOfBlocks-1;
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2238

2239 2240
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
  return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2241 2242 2243 2244 2245 2246 2247
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2248 2249
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
2250

2251 2252
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2253 2254 2255 2256

  cur->slot += step;
  cur->mixBlock       = false;
  cur->blockCompleted = false;
2257
}
H
Haojun Liao 已提交
2258 2259

int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
2260
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
H
Haojun Liao 已提交
2261

H
Haojun Liao 已提交
2262
  pTableBlockInfo->totalSize = 0;
2263
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2264

2265
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2266 2267

  // find the start data block in file
2268 2269 2270
  pTsdbReadHandle->locateStart = true;
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
  int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2271 2272

  tsdbRLockFS(pFileHandle);
2273 2274
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2275 2276
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2277
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2278

H
Haojun Liao 已提交
2279
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2280
  int32_t     numOfBlocks = 0;
2281
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2282
  int         defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2283 2284
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Haojun Liao 已提交
2285 2286
  bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

H
Haojun Liao 已提交
2287 2288
  while (true) {
    numOfBlocks = 0;
2289
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2290

2291 2292
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2293 2294 2295
      break;
    }

2296
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2297 2298

    // current file are not overlapped with query time window, ignore remain files
H
Haojun Liao 已提交
2299
    if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
2300
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2301 2302
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2303
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2304 2305 2306
      break;
    }

H
Haojun Liao 已提交
2307
    pTableBlockInfo->numOfFiles += 1;
2308 2309
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2310 2311 2312 2313
      code = terrno;
      break;
    }

2314
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2315

2316
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2317 2318 2319 2320
      code = terrno;
      break;
    }

2321
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2322 2323 2324
      break;
    }

H
Haojun Liao 已提交
2325 2326
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2327 2328 2329 2330 2331 2332

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2333
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2334 2335 2336

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2337
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2338

H
Haojun Liao 已提交
2339
        int32_t numOfRows = pBlock[j].numOfRows;
2340
        pTableBlockInfo->totalRows += numOfRows;
H
Haojun Liao 已提交
2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
H
Haojun Liao 已提交
2352 2353 2354
//        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
//        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
//        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2355 2356 2357 2358 2359 2360 2361
      }
    }
  }

  return code;
}

2362 2363 2364
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2365 2366

  // find the start data block in file
2367 2368 2369 2370
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
    STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
    int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2371

H
Hongze Cheng 已提交
2372
    tsdbRLockFS(pFileHandle);
2373 2374
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2375
    tsdbUnLockFS(pFileHandle);
2376

2377
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2378
  } else {
2379
    // check if current file block is all consumed
2380
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2381
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2382

2383
    // current block is done, try next
H
Haojun Liao 已提交
2384
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2385
      // all data blocks in current file has been checked already, try next file if exists
2386
    } else {
H
Haojun Liao 已提交
2387 2388
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2389 2390
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2391

H
Haojun Liao 已提交
2392 2393 2394 2395 2396 2397 2398
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2399 2400
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2401
    } else {
2402 2403 2404
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
      return getDataBlockRv(pTsdbReadHandle, pNext, exists);
2405 2406
    }
  }
2407 2408
}

2409 2410
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2411
  
2412 2413
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2414 2415
      return true;
    }
H
Haojun Liao 已提交
2416

2417
    pTsdbReadHandle->activeIndex += 1;
2418
  }
H
Haojun Liao 已提交
2419

2420 2421 2422
  return false;
}

2423
//todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2424
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2425
  // filter the queried time stamp in the first place
2426
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
2427 2428

  // starts from the buffer in case of descending timestamp order check data blocks
2429
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2430 2431 2432

  int32_t i = 0;
  while(i < numOfTables) {
2433
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2434 2435

    // the first qualified table for interpolation query
2436 2437 2438 2439
//    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//      break;
//    }
H
Haojun Liao 已提交
2440 2441 2442 2443 2444 2445 2446 2447 2448

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

2449 2450
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2451

2452 2453
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2454 2455 2456
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2457
                                 STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2458
  int     numOfRows = 0;
2459 2460
  int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
2461 2462 2463
  win->skey = TSKEY_INITIAL_VAL;

  int64_t st = taosGetTimestampUs();
D
fix bug  
dapan1121 已提交
2464 2465
  int16_t rv = -1;
  STSchema* pSchema = NULL;
H
Haojun Liao 已提交
2466 2467

  do {
H
Haojun Liao 已提交
2468
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
2469 2470 2471 2472
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2473
    TSKEY key = TD_ROW_KEY(row);
2474 2475 2476
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey,
                pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2477 2478 2479 2480 2481 2482 2483 2484 2485

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2486
    if (rv != TD_ROW_SVER(row)) {
2487
      pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
H
Haojun Liao 已提交
2488
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2489
    }
2490
    mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true);
H
Haojun Liao 已提交
2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501

    if (++numOfRows >= maxRowsToRead) {
      moveToNextRowInMem(pCheckInfo);
      break;
    }

  } while(moveToNextRowInMem(pCheckInfo));

  assert(numOfRows <= maxRowsToRead);

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
2502
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
H
Haojun Liao 已提交
2503 2504 2505
    int32_t emptySize = maxRowsToRead - numOfRows;

    for(int32_t i = 0; i < numOfCols; ++i) {
2506
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
2507 2508 2509 2510 2511
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }

  int64_t elapsedTime = taosGetTimestampUs() - st;
H
Haojun Liao 已提交
2512 2513
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle,
            elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2514 2515 2516 2517

  return numOfRows;
}

2518 2519
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2520

2521 2522 2523 2524 2525
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2526

2527
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2528 2529 2530
    taosArrayPush(list, &info);
  }

2531
  metaCloseCtbCurosr(pCur);
H
Haojun Liao 已提交
2532 2533 2534 2535 2536 2537 2538 2539
  return TSDB_CODE_SUCCESS;
}

static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

2540 2541
//  tQueryInfo* pInfo = (tQueryInfo*)param;
//  if (pInfo->optr != TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
2542
//    taosMemoryFreeClear(pInfo->q);
2543 2544 2545
//  } else {
//    taosHashCleanup((SHashObj *)(pInfo->q));
//  }
H
Haojun Liao 已提交
2546

wafwerar's avatar
wafwerar 已提交
2547
  taosMemoryFree(param);
H
Haojun Liao 已提交
2548 2549
}

2550 2551 2552 2553 2554
#define TSDB_PREV_ROW  0x1
#define TSDB_NEXT_ROW  0x2

static bool  loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2555 2556
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2557

2558
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2559
    if (code != TSDB_CODE_SUCCESS) {
2560
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2561 2562
      return false;
    }
H
Haojun Liao 已提交
2563

H
Haojun Liao 已提交
2564
    if (exists) {
H
Haojun Liao 已提交
2565
      tsdbRetrieveDataBlock((tsdbReaderT*) pTsdbReadHandle, NULL);
2566 2567 2568
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2569 2570
      }

2571
      pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2572 2573
      return exists;
    }
H
Haojun Liao 已提交
2574

2575
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2576
  }
H
Haojun Liao 已提交
2577

2578 2579
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2580 2581
    return true;
  }
H
Haojun Liao 已提交
2582

H
Haojun Liao 已提交
2583
  // current result is empty
2584 2585
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) {
//    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2586

2587 2588
//    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
//    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2589

2590
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2591

2592 2593 2594
//    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2595 2596

    return result;
2597
  }
H
Haojun Liao 已提交
2598

H
Haojun Liao 已提交
2599 2600
  return false;
}
2601

2602
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2603
  // the last row is cached in buffer, return it directly.
2604 2605 2606
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
  int32_t numOfCols  = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2607 2608
  assert(numOfTables > 0 && numOfCols > 0);

2609
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2610

H
Haojun Liao 已提交
2611
  STSRow*  pRow = NULL;
H
Haojun Liao 已提交
2612
  TSKEY    key  = TSKEY_INITIAL_VAL;
2613 2614 2615 2616 2617 2618 2619 2620
  int32_t  step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
//    if (ret != TSDB_CODE_SUCCESS) {
//      return false;
//    }
H
Haojun Liao 已提交
2621
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
wafwerar's avatar
wafwerar 已提交
2622
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2623

H
Haojun Liao 已提交
2624 2625 2626 2627 2628 2629 2630 2631 2632 2633
    // update the last key value
    pCheckInfo->lastKey = key + step;

    cur->rows     = 1;  // only one row
    cur->lastKey  = key + step;
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2634
  }
H
Haojun Liao 已提交
2635

H
Haojun Liao 已提交
2636 2637 2638
  return false;
}

D
init  
dapan1121 已提交
2639

D
update  
dapan1121 已提交
2640

2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792
//static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId);
//      continue;
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2793 2794 2795
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
2796

2797 2798
  while(pTsdbReadHandle->activeIndex < numOfTables) {
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2799 2800 2801
      return true;
    }

2802
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
2803 2804
    pCheckInfo->numOfBlocks = 0;

2805 2806 2807 2808 2809
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
    pTsdbReadHandle->checkFiles  = true;
    pTsdbReadHandle->cur.rows    = 0;
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
2810 2811 2812 2813

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
2814
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2815 2816 2817
  }

  return false;
2818 2819
}

H
Haojun Liao 已提交
2820
// handle data in cache situation
H
Haojun Liao 已提交
2821
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
2822
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
Y
yihaoDeng 已提交
2823

2824 2825 2826 2827 2828
  for(int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

2829
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2830
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
2831 2832 2833
    return false;
  }

Y
yihaoDeng 已提交
2834 2835 2836
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

2837
  // TODO refactor: remove "type"
2838 2839 2840 2841 2842
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
//      return loadCachedLastRow(pTsdbReadHandle);
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
//      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
2843
    }
H
Haojun Liao 已提交
2844
  }
Y
yihaoDeng 已提交
2845

2846 2847
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
H
Haojun Liao 已提交
2848
  } else { // loadType == RR and Offset Order
2849
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2850 2851 2852
      // check if the query range overlaps with the file data block
      bool exists = true;

2853
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2854
      if (code != TSDB_CODE_SUCCESS) {
2855 2856
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2857 2858 2859 2860 2861

        return false;
      }

      if (exists) {
2862
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
2863 2864
        return exists;
      }
Y
yihaoDeng 已提交
2865

2866 2867
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
2868 2869
    }

H
Haojun Liao 已提交
2870
    // TODO: opt by consider the scan order
2871
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
2872
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
2873

H
Haojun Liao 已提交
2874
    elapsedTime = taosGetTimestampUs() - stime;
2875
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2876
    return ret;
Y
yihaoDeng 已提交
2877 2878
  }
}
2879

2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
2915
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
//  STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
2934
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
2935 2936 2937 2938 2939 2940 2941 2942 2943 2944
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
2945
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
2946
//  taosMemoryFreeClear(cond.colList);
2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
//out_of_memory:
2986
//  tsdbCleanupReadHandle(pSecQueryHandle);
2987 2988 2989
//  return terrno;
//}

H
Haojun Liao 已提交
2990
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
2991 2992
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2993

H
Haojun Liao 已提交
2994 2995
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
2996
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
2997 2998
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
2999 3000
  }

3001
  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3002
  for (int32_t i = 0; i < numOfCols; ++i) {
3003 3004
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3005 3006 3007

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3008
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3009
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3010 3011

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3012
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3013
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3014 3015 3016
    }
  }

H
Haojun Liao 已提交
3017 3018
  cur->rows = 2;
  return true;
3019 3020
}

3021
/*
3022
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3023
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3024
 */
H
Haojun Liao 已提交
3025
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3042
// out:
3043 3044 3045 3046
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3047 3048
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
  return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3049 3050
}

3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {
  assert(pTsdbReadHandle != NULL && groupList != NULL);

//  TSKEY    key = TSKEY_INITIAL_VAL;
//
//  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
//  assert(group != NULL);
//
//  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
//
//  int32_t code = 0;
//
//  if (((STable*)pInfo->pTable)->lastRow) {
//    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
//    if (code != TSDB_CODE_SUCCESS) {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
//    } else {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
//    }
//  }
//
//  // update the tsdb query time range
//  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
//    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
//    pTsdbReadHandle->checkFiles  = false;
//    pTsdbReadHandle->activeIndex = -1;  // start from -1
//  }
H
Haojun Liao 已提交
3078

3079
  return TSDB_CODE_SUCCESS;
3080 3081
}

3082 3083
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3084 3085

  int32_t code = 0;
3086 3087 3088
//  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
//    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
//  }
D
update  
dapan1121 已提交
3089 3090

  // update the tsdb query time range
3091 3092 3093
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->checkFiles  = false;
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3094 3095 3096 3097 3098 3099
  }

  return code;
}


3100
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
H
Haojun Liao 已提交
3101
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3102

H
Haojun Liao 已提交
3103
  int32_t totalNumOfTable = 0;
3104
  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
H
Haojun Liao 已提交
3105

H
Haojun Liao 已提交
3106 3107 3108 3109 3110
  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  for(int32_t j = 0; j < numOfGroups; ++j) {
    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
    TSKEY   key = TSKEY_INITIAL_VAL;
H
Haojun Liao 已提交
3111

H
Haojun Liao 已提交
3112
    STableKeyInfo keyInfo = {0};
H
Haojun Liao 已提交
3113

H
Haojun Liao 已提交
3114 3115
    size_t numOfTables = taosArrayGetSize(pGroup);
    for(int32_t i = 0; i < numOfTables; ++i) {
3116
      STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
H
Haojun Liao 已提交
3117

H
Haojun Liao 已提交
3118
      // if the lastKey equals to INT64_MIN, there is no data in this table
3119
      TSKEY lastKey = 0;//((STable*)(pInfo->pTable))->lastKey;
H
Haojun Liao 已提交
3120 3121
      if (key < lastKey) {
        key = lastKey;
H
Haojun Liao 已提交
3122

3123
//        keyInfo.pTable  = pInfo->pTable;
H
Haojun Liao 已提交
3124
        keyInfo.lastKey = key;
3125
        pInfo->lastKey  = key;
H
Haojun Liao 已提交
3126

H
Haojun Liao 已提交
3127 3128 3129
        if (key < window.skey) {
          window.skey = key;
        }
3130

H
Haojun Liao 已提交
3131 3132 3133 3134
        if (key > window.ekey) {
          window.ekey = key;
        }
      }
3135
    }
H
Haojun Liao 已提交
3136

H
Haojun Liao 已提交
3137
    // more than one table in each group, only one table left for each group
3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149
//    if (keyInfo.pTable != NULL) {
//      totalNumOfTable++;
//      if (taosArrayGetSize(pGroup) == 1) {
//        // do nothing
//      } else {
//        taosArrayClear(pGroup);
//        taosArrayPush(pGroup, &keyInfo);
//      }
//    } else {  // mark all the empty groups, and remove it later
//      taosArrayDestroy(pGroup);
//      taosArrayPush(emptyGroup, &j);
//    }
3150
  }
H
Haojun Liao 已提交
3151

H
Haojun Liao 已提交
3152 3153 3154
  // window does not being updated, so set the original
  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
    window = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3155
    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
H
Haojun Liao 已提交
3156 3157
  }

H
Haojun Liao 已提交
3158
  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t) taosArrayGetSize(emptyGroup));
3159 3160
  taosArrayDestroy(emptyGroup);

H
Haojun Liao 已提交
3161
  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3162
  return window;
H
hjxilinx 已提交
3163 3164
}

H
Haojun Liao 已提交
3165
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3166
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3167
  SQueryFilePos* cur = &pHandle->cur;
3168 3169

  uint64_t uid = 0;
H
Haojun Liao 已提交
3170

3171
  // there are data in file
D
dapan1121 已提交
3172
  if (pHandle->cur.fid != INT32_MIN) {
3173
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3174
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3175
  } else {
3176
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3177
    uid = pCheckInfo->tableId;
3178
  }
3179

3180 3181 3182 3183
  tsdbDebug("data block generated, uid:%"PRIu64" numOfRows:%d, tsrange:%"PRId64" - %"PRId64" %s", uid, cur->rows, cur->win.skey,
      cur->win.ekey, pHandle->idStr);

//  pDataBlockInfo->uid    = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign the table uid
3184
  pDataBlockInfo->rows   = cur->rows;
H
Haojun Liao 已提交
3185
  pDataBlockInfo->window = cur->win;
S
TD-1057  
Shengliang Guan 已提交
3186
  pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
3187
}
H
hjxilinx 已提交
3188

H
Haojun Liao 已提交
3189 3190 3191
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
H
Haojun Liao 已提交
3192
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
3193
  STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle;
H
Haojun Liao 已提交
3194

H
Haojun Liao 已提交
3195 3196
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3197 3198 3199
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3200

H
Haojun Liao 已提交
3201 3202 3203 3204
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3205 3206 3207 3208
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3209 3210

  int64_t stime = taosGetTimestampUs();
3211 3212
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3213
    return terrno;
3214 3215 3216
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3217
  }
H
Haojun Liao 已提交
3218

H
Haojun Liao 已提交
3219 3220
  int16_t* colIds = pHandle->defaultLoadColumn->pData;

H
Haojun Liao 已提交
3221
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
H
Haojun Liao 已提交
3222
  memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
3223
  for(int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3224
    pHandle->statis[i].colId = colIds[i];
3225
  }
H
Haojun Liao 已提交
3226

3227
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3228 3229 3230

  // always load the first primary timestamp column data
  SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
3231
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3232 3233 3234 3235 3236

  pPrimaryColStatis->numOfNull = 0;
  pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
  pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;

H
Haojun Liao 已提交
3237
  //update the number of NULL data rows
H
Haojun Liao 已提交
3238
  for(int32_t i = 1; i < numOfCols; ++i) {
3239
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
3240 3241 3242
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
  }
H
Haojun Liao 已提交
3243 3244 3245 3246

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

H
Haojun Liao 已提交
3247
  *pBlockStatis = pHandle->statis;
3248
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3249 3250
}

H
Haojun Liao 已提交
3251
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3252
  /**
H
hjxilinx 已提交
3253
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3254 3255
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3256
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3257
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3258 3259
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3260 3261
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3262

3263
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3264 3265
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3266
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3267
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3268

H
hjxilinx 已提交
3269 3270
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3271

H
Hongze Cheng 已提交
3272
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3273
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3274
        return pHandle->pColumns;
H
Haojun Liao 已提交
3275
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3276
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3277
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3278 3279
          return NULL;
        }
H
Haojun Liao 已提交
3280

H
Haojun Liao 已提交
3281
        // todo refactor
H
Haojun Liao 已提交
3282
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
3283

H
Haojun Liao 已提交
3284
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
3285
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
3286
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
S
TD-1057  
Shengliang Guan 已提交
3287
          int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
H
Haojun Liao 已提交
3288

H
Haojun Liao 已提交
3289 3290
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
3291
            memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
H
Haojun Liao 已提交
3292 3293
          }
        }
H
Haojun Liao 已提交
3294

H
hjxilinx 已提交
3295 3296
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3297 3298
    }
  }
H
hjxilinx 已提交
3299
}
3300
#if 0
3301
void filterPrepare(void* expr, void* param) {
3302
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
3303
  if (pExpr->_node.info != NULL) {
3304 3305
    return;
  }
3306

wafwerar's avatar
wafwerar 已提交
3307
  pExpr->_node.info = taosMemoryCalloc(1, sizeof(tQueryInfo));
H
Haojun Liao 已提交
3308

3309
  STSchema*   pTSSchema = (STSchema*) param;
H
hjxilinx 已提交
3310 3311 3312
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
3313

3314 3315
  pInfo->sch      = *pSchema;
  pInfo->optr     = pExpr->_node.optr;
Y
yihaoDeng 已提交
3316
  pInfo->compare  = getComparFunc(pInfo->sch.type, pInfo->optr);
H
Haojun Liao 已提交
3317
  pInfo->indexed  = pTSSchema->columns->colId == pInfo->sch.colId;
H
Haojun Liao 已提交
3318

H
hjxilinx 已提交
3319
  if (pInfo->optr == TSDB_RELATION_IN) {
Y
yihaoDeng 已提交
3320
     int dummy = -1;
3321
     SHashObj *pObj = NULL;
Y
yihaoDeng 已提交
3322 3323 3324 3325
     if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
        pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
        SArray *arr = (SArray *)(pCond->arr);
        for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
Y
yihaoDeng 已提交
3326
          char* p = taosArrayGetP(arr, i);
3327 3328
          strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
          taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
Y
yihaoDeng 已提交
3329 3330 3331 3332
        }
     } else {
       buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
     }
3333
     pInfo->q = (char *)pObj;
H
Haojun Liao 已提交
3334
  } else if (pCond != NULL) {
3335 3336 3337 3338
    uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
    if (size < (uint32_t)pSchema->bytes) {
      size = pSchema->bytes;
    }
wafwerar's avatar
wafwerar 已提交
3339
    // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
wafwerar's avatar
wafwerar 已提交
3340
    pInfo->q = taosMemoryCalloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
3341
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
3342
  }
3343 3344
}

3345
#endif
3346

H
Haojun Liao 已提交
3347
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
3348
#if 0
3349
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
3350 3351
  STable* pTable1 = ((STableKeyInfo*) p1)->uid;
  STable* pTable2 = ((STableKeyInfo*) p2)->uid;
H
Haojun Liao 已提交
3352

3353 3354 3355
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
H
Haojun Liao 已提交
3356

H
Haojun Liao 已提交
3357
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
H
Haojun Liao 已提交
3358

3359 3360 3361 3362
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
H
Haojun Liao 已提交
3363

H
Haojun Liao 已提交
3364 3365 3366
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
3367
      type = TSDB_DATA_TYPE_BINARY;
3368
      bytes = tGetTbnameColumnSchema()->bytes;
3369
    } else {
Y
yihaoDeng 已提交
3370 3371 3372 3373 3374 3375 3376
      if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
        STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
        bytes = pCol->bytes;
        type = pCol->type;
        f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
        f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
      } 
3377
    }
H
Haojun Liao 已提交
3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

3392 3393 3394 3395 3396 3397 3398
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
3399
#endif
3400 3401 3402
  return 0;
}

H
Haojun Liao 已提交
3403
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3404
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3405
    return -1;
3406
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3407 3408 3409 3410 3411 3412 3413 3414 3415
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
                          STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
3416
  STable* pTable = taosArrayGetP(pTableList, 0);
H
Haojun Liao 已提交
3417 3418
  SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));

3419
  STableKeyInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
3420
  taosArrayPush(g, &info);
3421

3422
  for (int32_t i = 1; i < numOfTables; ++i) {
3423 3424
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
Haojun Liao 已提交
3425

H
hjxilinx 已提交
3426
    int32_t ret = compareFn(prev, p, pSupp);
3427
    assert(ret == 0 || ret == -1);
H
Haojun Liao 已提交
3428

3429
    if (ret == 0) {
3430
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3431
      taosArrayPush(g, &info1);
3432 3433
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
H
Haojun Liao 已提交
3434 3435
      g = taosArrayInit(16, sizeof(STableKeyInfo));

3436
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3437
      taosArrayPush(g, &info1);
3438 3439
    }
  }
H
Haojun Liao 已提交
3440

3441
  taosArrayPush(pGroups, &g);
3442 3443
}

3444
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
3445
  assert(pTableList != NULL);
3446
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3447

3448 3449
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
3450
    tsdbDebug("no qualified tables");
3451 3452
    return pTableGroup;
  }
H
Haojun Liao 已提交
3453

3454
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
3455
    SArray* sa = taosArrayDup(pTableList);
H
Haojun Liao 已提交
3456 3457 3458 3459
    if (sa == NULL) {
      taosArrayDestroy(pTableGroup);
      return NULL;
    }
H
Haojun Liao 已提交
3460

3461
    taosArrayPush(pTableGroup, &sa);
S
TD-1057  
Shengliang Guan 已提交
3462
    tsdbDebug("all %" PRIzu " tables belong to one group", size);
3463
  } else {
H
Haojun Liao 已提交
3464 3465
    STableGroupSupporter sup = {0};
    sup.numOfCols = numOfOrderCols;
3466
    sup.pTagSchema = pTagSchema->pSchema;
H
Haojun Liao 已提交
3467 3468
    sup.pCols = pCols;

3469 3470
    taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
    createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
3471
  }
H
Haojun Liao 已提交
3472

3473 3474 3475
  return pTableGroup;
}

3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559
//static bool tableFilterFp(const void* pNode, void* param) {
//  tQueryInfo* pInfo = (tQueryInfo*) param;
//
//  STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
//  char* val = NULL;
//  if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
//    val = (char*) TABLE_NAME(pTable);
//  } else {
//    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
//  }
//
//  if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
//    if (pInfo->optr == TSDB_RELATION_ISNULL) {
//      return (val == NULL) || isNull(val, pInfo->sch.type);
//    } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
//      return (val != NULL) && (!isNull(val, pInfo->sch.type));
//    }
//  } else if (pInfo->optr == TSDB_RELATION_IN) {
//     int type = pInfo->sch.type;
//     if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
//       int64_t v;
//       GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
//       uint64_t v;
//       GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     }
//     else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
//       double v;
//       GET_TYPED_DATA(v, double, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
//       return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
//     }
//
//  }
//
//  int32_t ret = 0;
//  if (val == NULL) { //the val is possible to be null, so check it out carefully
//    ret = -1; // val is missing in table tags value pairs
//  } else {
//    ret = pInfo->compare(val, pInfo->q);
//  }
//
//  switch (pInfo->optr) {
//    case TSDB_RELATION_EQUAL: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NOT_EQUAL: {
//      return ret != 0;
//    }
//    case TSDB_RELATION_GREATER_EQUAL: {
//      return ret >= 0;
//    }
//    case TSDB_RELATION_GREATER: {
//      return ret > 0;
//    }
//    case TSDB_RELATION_LESS_EQUAL: {
//      return ret <= 0;
//    }
//    case TSDB_RELATION_LESS: {
//      return ret < 0;
//    }
//    case TSDB_RELATION_LIKE: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_MATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NMATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_IN: {
//      return ret == 1;
//    }
//
//    default:
//      assert(false);
//  }
//
//  return true;
//}
H
Haojun Liao 已提交
3560

3561
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
3562

3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574
//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
//  // query according to the expression tree
//  SExprTraverseSupp supp = {
//      .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
//      .setupInfoFn = filterPrepare,
//      .pExtInfo = pSTable->tagSchema,
//      };
//
//  getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
//  tExprTreeDestroy(pExpr, destroyHelper);
//  return TSDB_CODE_SUCCESS;
//}
3575

H
Haojun Liao 已提交
3576
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
3577
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
3578
                                 SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
H
Haojun Liao 已提交
3579
  STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
3580
  if (pTbCfg == NULL) {
H
Haojun Liao 已提交
3581
    tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
3582 3583
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3584
  }
H
Haojun Liao 已提交
3585

3586
  if (pTbCfg->type != META_SUPER_TABLE) {
H
Haojun Liao 已提交
3587
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
3588
    terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
3589
    goto _error;
H
hjxilinx 已提交
3590
  }
3591 3592

  //NOTE: not add ref count for super table
H
Haojun Liao 已提交
3593
  SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
3594
  SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
H
Haojun Liao 已提交
3595

weixin_48148422's avatar
weixin_48148422 已提交
3596 3597
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
H
Haojun Liao 已提交
3598
    int32_t ret = getAllTableList(pMeta, uid, res);
3599 3600
    if (ret != TSDB_CODE_SUCCESS) {
      goto _error;
3601
    }
3602

sangshuduo's avatar
sangshuduo 已提交
3603
    pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
H
Haojun Liao 已提交
3604
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
H
Haojun Liao 已提交
3605

H
Haojun Liao 已提交
3606 3607
    tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta,
              pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
3608

3609
    taosArrayDestroy(res);
3610 3611
    return ret;
  }
3612

H
hjxilinx 已提交
3613
  int32_t ret = TSDB_CODE_SUCCESS;
3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625
//  tExprNode* expr = NULL;
//
//  TRY(TSDB_MAX_TAG_CONDITIONS) {
//    expr = exprTreeFromTableName(tbnameCond);
//    if (expr == NULL) {
//      expr = exprTreeFromBinary(pTagCond, len);
//    } else {
//      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
//      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
//      if (tagExpr != NULL) {
//        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
//        tExprNode* tbnameExpr = expr;
wafwerar's avatar
wafwerar 已提交
3626
//        expr = taosMemoryCalloc(1, sizeof(tExprNode));
3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657
//        if (expr == NULL) {
//          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
//        }
//        expr->nodeType = TSQL_NODE_EXPR;
//        expr->_node.optr = (uint8_t)tagNameRelType;
//        expr->_node.pLeft = tagExpr;
//        expr->_node.pRight = tbnameExpr;
//      }
//    }
//    CLEANUP_EXECUTE();
//
//  } CATCH( code ) {
//    CLEANUP_EXECUTE();
//    terrno = code;
//    tsdbUnlockRepoMeta(tsdb);     // unlock tsdb in any cases
//
//    goto _error;
//    // TODO: more error handling
//  } END_TRY
//
//  doQueryTableList(pTable, res, expr);
//  pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
//  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
//  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
//      pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
//  taosArrayDestroy(res);
//
//  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
//  return ret;
3658 3659 3660

  _error:
  return terrno;
3661
}
3662

H
Haojun Liao 已提交
3663 3664
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
  STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
3665
  if (pTbCfg == NULL) {
3666 3667
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3668
  }
3669

3670 3671
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3672

H
Haojun Liao 已提交
3673 3674
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));

3675
  STableKeyInfo info = {.lastKey = startKey, .uid = uid};
H
Haojun Liao 已提交
3676
  taosArrayPush(group, &info);
H
Haojun Liao 已提交
3677

3678
  taosArrayPush(pGroupInfo->pGroupList, &group);
3679
  return TSDB_CODE_SUCCESS;
3680 3681 3682

  _error:
  return terrno;
3683
}
3684

3685
#if 0
3686
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
3687 3688 3689
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
3690 3691 3692 3693

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3694
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
3695

B
Bomin Zhang 已提交
3696
  for(int32_t i = 0; i < size; ++i) {
3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
D
fix bug  
dapan1121 已提交
3708 3709 3710
      tsdbUnlockRepoMeta(tsdb);
      taosArrayDestroy(group);
      return terrno;
3711 3712
    }

H
Haojun Liao 已提交
3713 3714
    STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
    taosArrayPush(group, &info);
3715 3716
  }

B
Bomin Zhang 已提交
3717 3718 3719 3720
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
3721

sangshuduo's avatar
sangshuduo 已提交
3722
  pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
B
Bomin Zhang 已提交
3723 3724 3725 3726 3727
  if (pGroupInfo->numOfTables > 0) {
    taosArrayPush(pGroupInfo->pGroupList, &group);
  } else {
    taosArrayDestroy(group);
  }
3728 3729 3730

  return TSDB_CODE_SUCCESS;
}
3731
#endif
3732 3733 3734 3735 3736 3737 3738 3739
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3740
    taosMemoryFreeClear(pColInfo->pData);
3741 3742 3743 3744 3745 3746
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3747 3748 3749 3750 3751 3752
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3753
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3754 3755 3756 3757 3758 3759
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

3760

H
Haojun Liao 已提交
3761
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3762 3763
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3764 3765
    return;
  }
3766

3767
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3768

3769
  taosArrayDestroy(pTsdbReadHandle->defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3770 3771
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
  taosMemoryFreeClear(pTsdbReadHandle->statis);
3772

3773
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
3774
//    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3775
  } else {
3776
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3777 3778
  }

3779 3780
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3781
  }
3782

3783
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3784

3785 3786
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3787

3788 3789
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3790

3791
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3792

H
Haojun Liao 已提交
3793 3794
  tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s",
      pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3795

wafwerar's avatar
wafwerar 已提交
3796
  taosMemoryFreeClear(pTsdbReadHandle);
3797
}
3798

3799
#if 0
H
Haojun Liao 已提交
3800
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
3801 3802 3803 3804 3805 3806 3807 3808 3809 3810
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
3811 3812 3813 3814
      if (pTable != NULL) { // in case of handling retrieve data from tsdb
        tsdbUnRefTable(pTable);
      }
      //assert(pTable != NULL);
3815 3816 3817 3818 3819
    }

    taosArrayDestroy(p);
  }

3820
  taosHashCleanup(pGroupList->map);
3821
  taosArrayDestroy(pGroupList->pGroupList);
H
Haojun Liao 已提交
3822
  pGroupList->numOfTables = 0;
3823
}
H
Haojun Liao 已提交
3824 3825 3826 3827 3828 3829 3830

static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  // Scan each node in the skiplist by using iterator
  while (tSkipListIterNext(iter)) {
    SSkipListNode *pNode = tSkipListIterGet(iter);
H
Haojun Liao 已提交
3831
    if (exprTreeApplyFilter(pExpr, pNode, param)) {
H
Haojun Liao 已提交
3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854
      taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
    }
  }

  tSkipListDestroyIter(iter);
}

typedef struct {
  char*    v;
  int32_t  optr;
} SEndPoint;

typedef struct {
  SEndPoint* start;
  SEndPoint* end;
} SQueryCond;

// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
  int32_t optr = queryColInfo->optr;

  if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
      optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3855
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3856
    pCond->start->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3857
    pCond->start->v    = queryColInfo->q;
H
Haojun Liao 已提交
3858
  } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3859
    pCond->end       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3860
    pCond->end->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3861 3862
    pCond->end->v    = queryColInfo->q;
  } else if (optr == TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
3863
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
Y
yihaoDeng 已提交
3864 3865 3866
    pCond->start->optr = queryColInfo->optr;
    pCond->start->v    = queryColInfo->q; 
  } else if (optr == TSDB_RELATION_LIKE) {
H
Haojun Liao 已提交
3867
    assert(0);
3868 3869
  } else if (optr == TSDB_RELATION_MATCH) {
    assert(0);
3870 3871
  } else if (optr == TSDB_RELATION_NMATCH) {
    assert(0);
H
Haojun Liao 已提交
3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954
  }

  return TSDB_CODE_SUCCESS;
}

static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
  SSkipListIterator* iter = NULL;

  SQueryCond cond = {0};
  if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
    //todo handle error
  }

  if (cond.start != NULL) {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
  } else {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
  }

  if (cond.start != NULL) {
    int32_t optr = cond.start->optr;

    if (optr == TSDB_RELATION_EQUAL) {   // equals
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
    } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
      bool comp = true;
      int32_t ret = 0;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
          assert(ret >= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_GREATER) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;
        }
      }
    } else if (optr == TSDB_RELATION_NOT_EQUAL) {   // not equal
      bool comp = true;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

      tSkipListDestroyIter(iter);

      comp = true;
      iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

Y
yihaoDeng 已提交
3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967
    } else if (optr == TSDB_RELATION_IN) {
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
      
H
Haojun Liao 已提交
3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008
    } else {
      assert(0);
    }
  } else {
    int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
    if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
      bool    comp = true;
      int32_t ret = 0;

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
          assert(ret <= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_LESS) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;  // no need to compare anymore
        }
      }
    } else {
      assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
        if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
            (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
        }
      }
    }
  }

wafwerar's avatar
wafwerar 已提交
4009 4010
  taosMemoryFree(cond.start);
  taosMemoryFree(cond.end);
H
Haojun Liao 已提交
4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028
  tSkipListDestroyIter(iter);
}

static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  while (tSkipListIterNext(iter)) {
    bool addToResult = false;

    SSkipListNode *pNode = tSkipListIterGet(iter);

    char *pData = SL_GET_NODE_DATA(pNode);
    tstr *name = (tstr*) tsdbGetTableName((void*) pData);

    // todo speed up by using hash
    if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
      if (pQueryInfo->optr == TSDB_RELATION_IN) {
        addToResult = pQueryInfo->compare(name, pQueryInfo->q);
4029 4030 4031
      } else if (pQueryInfo->optr == TSDB_RELATION_LIKE ||
                 pQueryInfo->optr == TSDB_RELATION_MATCH ||
                 pQueryInfo->optr == TSDB_RELATION_NMATCH) {
H
Haojun Liao 已提交
4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062
        addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
      }
    } else {
      addToResult = filterFp(pNode, pQueryInfo);
    }

    if (addToResult) {
      STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
      taosArrayPush(res, &info);
    }
  }

  tSkipListDestroyIter(iter);
}

// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
  if (pExpr == NULL) {
    return;
  }

  tExprNode *pLeft  = pExpr->_node.pLeft;
  tExprNode *pRight = pExpr->_node.pRight;

  // column project
  if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
    assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));

    param->setupInfoFn(pExpr, param->pExtInfo);

    tQueryInfo *pQueryInfo = pExpr->_node.info;
4063 4064
    if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
                                && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
4065
                                && pQueryInfo->optr != TSDB_RELATION_IN)) {
H
Haojun Liao 已提交
4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080
      queryIndexedColumn(pSkipList, pQueryInfo, result);
    } else {
      queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
    }

    return;
  }

  // The value of hasPK is always 0.
  uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
  assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);

  //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
  applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
L
Liu Jicong 已提交
4081
#endif