tsdbRead.c 139.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wafwerar's avatar
wafwerar 已提交
16
#include "tsdbDef.h"
H
Haojun Liao 已提交
17
#include "tdatablock.h"
H
Haojun Liao 已提交
18 19 20 21 22
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "texception.h"
H
Hongze Cheng 已提交
23
#include "vnode.h"
24 25 26 27
#include "tsdbFS.h"
#include "tsdbLog.h"
#include "tsdbReadImpl.h"
#include "tskiplist.h"
H
Haojun Liao 已提交
28
#include "ttime.h"
29

30
#include "taosdef.h"
31
#include "tlosertree.h"
H
Hongze Cheng 已提交
32
#include "tsdbDef.h"
33
#include "tmsg.h"
H
Haojun Liao 已提交
34
#include "tsdbCommit.h"
35

36
#define EXTRA_BYTES 2
37
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
38
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
39

H
Haojun Liao 已提交
40 41 42 43
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
44
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
45

H
hjxilinx 已提交
46
enum {
47 48
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
H
hjxilinx 已提交
49 50
};

51 52 53 54 55 56
enum {
  TSDB_CACHED_TYPE_NONE    = 0,
  TSDB_CACHED_TYPE_LASTROW = 1,
  TSDB_CACHED_TYPE_LAST    = 2,
};

57 58
typedef struct SQueryFilePos {
  int32_t fid;
59 60
  int32_t slot;
  int32_t pos;
61
  int64_t lastKey;
62 63
  int32_t rows;
  bool    mixBlock;
64
  bool    blockCompleted;
65
  STimeWindow win;
66
} SQueryFilePos;
H
hjxilinx 已提交
67

68
typedef struct SDataBlockLoadInfo {
H
Hongze Cheng 已提交
69
  SDFileSet*  fileGroup;
70
  int32_t     slot;
71
  uint64_t    uid;
72
  SArray*     pLoadedCols;
73
} SDataBlockLoadInfo;
H
hjxilinx 已提交
74

75
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
76
  int32_t tid; /* table tid */
77 78
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
79

80 81 82 83 84 85
enum {
  CHECKINFO_CHOSEN_MEM  = 0,
  CHECKINFO_CHOSEN_IMEM = 1,
  CHECKINFO_CHOSEN_BOTH = 2    //for update=2(merge case)
};

86
typedef struct STableCheckInfo {
87
  uint64_t      tableId;
H
Haojun Liao 已提交
88
  TSKEY         lastKey;
H
Haojun Liao 已提交
89
  SBlockInfo*   pCompInfo;
H
Haojun Liao 已提交
90
  int32_t       compSize;
91
  int32_t       numOfBlocks:29; // number of qualified data blocks not the original blocks
92
  uint8_t       chosen:2;       // indicate which iterator should move forward
H
Haojun Liao 已提交
93
  bool          initBuf:1;        // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
94 95
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
96
} STableCheckInfo;
97

98
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
99 100
  SBlock          *compBlock;
  STableCheckInfo *pTableCheckInfo;
101
} STableBlockInfo;
102

103 104
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
105
  STableBlockInfo**   pDataBlockInfo;
106
  int32_t*            blockIndexArray;
107
  int32_t*            numOfBlocksPerTable;
108 109
} SBlockOrderSupporter;

H
Haojun Liao 已提交
110 111 112
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
113
  int64_t checkForNextTime;
114 115
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
116 117
} SIOCostSummary;

118 119
typedef struct STsdbReadHandle {
  STsdb*     pTsdb;
H
Haojun Liao 已提交
120 121 122 123 124 125 126 127 128
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
H
Haojun Liao 已提交
129
  SArray*        pTableCheckInfo;  // SArray<STableCheckInfo>
H
Haojun Liao 已提交
130 131
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
D
init  
dapan1121 已提交
132
  int8_t         cachelastrow;     // check if last row cached
133
  bool           loadExternalRow;  // load time window external data rows
H
Haojun Liao 已提交
134 135
  bool           currentLoadExternalRows; // current load external rows
  int32_t        loadType;         // block load type
H
Haojun Liao 已提交
136
  char          *idStr;            // query info handle, for debug purpose
H
Haojun Liao 已提交
137
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
H
Hongze Cheng 已提交
138 139 140
  SDFileSet*     pFileGroup;
  SFSIter        fileIter;
  SReadH         rhelper;
H
Haojun Liao 已提交
141
  STableBlockInfo* pDataBlockInfo;
H
Haojun Liao 已提交
142
  SDataCols     *pDataCols;        // in order to hold current file data block
H
Haojun Liao 已提交
143
  int32_t        allocSize;        // allocated data block size
H
Haojun Liao 已提交
144
  SArray        *defaultLoadColumn;// default load column
H
Haojun Liao 已提交
145
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
H
Haojun Liao 已提交
146
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
H
Haojun Liao 已提交
147

148 149
  SArray        *prev;             // previous row which is before than time window
  SArray        *next;             // next row which is after the query time window
H
Haojun Liao 已提交
150
  SIOCostSummary cost;
151
} STsdbReadHandle;
152

H
Haojun Liao 已提交
153 154 155
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
156
  SSchema*   pTagSchema;
H
Haojun Liao 已提交
157 158
} STableGroupSupporter;

159
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
160 161
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
162
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
163

H
Haojun Liao 已提交
164
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
165
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
166
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
167
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle);
168
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
169 170 171
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
//static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
//static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
H
Haojun Liao 已提交
172
static bool    tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
173

174
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
175
  pBlockLoadInfo->slot = -1;
176
  pBlockLoadInfo->uid  = 0;
H
hjxilinx 已提交
177
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
178 179
}

180
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
181
  pCompBlockLoadInfo->tid = -1;
182 183
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
184

185 186
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
187 188 189 190
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
191
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
192 193 194 195 196 197
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

198 199
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
200 201 202 203 204

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
205 206
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
207 208 209 210 211 212
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;

  int64_t rows = 0;
  STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable;
  if (pMemTable == NULL) { return rows; }

//  STableData* pMem  = NULL;
//  STableData* pIMem = NULL;

//  SMemTable* pMemT = pMemRef->snapshot.mem;
//  SMemTable* pIMemT = pMemRef->snapshot.imem;

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

//    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
//      pMem = pMemT->tData[pCheckInfo->tableId];
//      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
//    }
//    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
//      pIMem = pIMemT->tData[pCheckInfo->tableId];
//      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
//    }
  }
  return rows;
}
241

242 243 244
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
  assert(numOfGroup >= 1);
H
Haojun Liao 已提交
245 246 247 248 249 250 251 252

  // allocate buffer in order to load data blocks from file
  SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
253
  for (int32_t i = 0; i < numOfGroup; ++i) {
H
Haojun Liao 已提交
254 255 256 257 258 259 260 261
    SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);

    size_t gsize = taosArrayGetSize(group);
    assert(gsize > 0);

    for (int32_t j = 0; j < gsize; ++j) {
      STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);

262
      STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
263 264 265
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
        if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
          info.lastKey = pTsdbReadHandle->window.skey;
266 267
        }

268
        assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
269
      } else {
270
        assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
271 272 273
      }

      taosArrayPush(pTableCheckInfo, &info);
H
Haojun Liao 已提交
274
      tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
275 276 277
    }
  }

278
  // TODO  group table according to the tag value.
279
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
280 281 282
  return pTableCheckInfo;
}

283 284
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
285 286 287 288
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
289 290
    STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
291 292
    pCheckInfo->iter    = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter   = tSkipListDestroyIter(pCheckInfo->iiter);
293
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
294

295 296
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
297
    } else {
298
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
299
    }
H
Haojun Liao 已提交
300 301 302
  }
}

H
Haojun Liao 已提交
303 304 305
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
306

H
Haojun Liao 已提交
307
  STableCheckInfo info = { .lastKey = skey};
H
Haojun Liao 已提交
308

H
Haojun Liao 已提交
309 310
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
311 312 313
  return pNew;
}

314 315
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
316

317 318
  STimeWindow* w = &pTsdbReadHandle->window;
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
319 320 321 322

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

323 324
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
325
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
326 327 328
  STsdbCfg* pCfg = &pTsdb->config;

  int64_t now = taosGetTimestamp(pCfg->precision);
329
  return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1;  // needs to add one tick
330 331
}

332 333
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
  pTsdbReadHandle->window = pCond->twindow;
334

335
  bool    updateTs = false;
336 337 338 339
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
340 341
      pCond->twindow.skey = startTs;
      updateTs = true;
342 343
    }
  } else {
344 345
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
346 347
      pCond->twindow.ekey = startTs;
      updateTs = true;
348 349 350
    }
  }

351
  if (updateTs) {
H
Haojun Liao 已提交
352 353 354
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
              pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
              pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
355
  }
356 357
}

H
Haojun Liao 已提交
358
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
359
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
360
  if (pReadHandle == NULL) {
361
    goto _end;
362
  }
H
Haojun Liao 已提交
363

364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
  pReadHandle->order       = pCond->order;
  pReadHandle->pTsdb       = tsdb;
  pReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid     = INT32_MIN;
  pReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles  = true;
  pReadHandle->activeIndex = 0;   // current active table index
  pReadHandle->allocSize   = 0;
  pReadHandle->locateStart = false;
  pReadHandle->loadType    = pCond->type;

  pReadHandle->outputCapacity  = 4096;//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
379
  char buf[128] = {0};
H
Haojun Liao 已提交
380
  snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
H
Haojun Liao 已提交
381 382
  pReadHandle->idStr = strdup(buf);

383
  if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
384
    goto _end;
B
Bomin Zhang 已提交
385
  }
H
Haojun Liao 已提交
386

387 388
  assert(pCond != NULL);
  setQueryTimewindow(pReadHandle, pCond);
389

390 391
  if (pCond->numOfCols > 0) {
    // allocate buffer in order to load data blocks from file
wafwerar's avatar
wafwerar 已提交
392
    pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SDataStatis));
393
    if (pReadHandle->statis == NULL) {
394
      goto _end;
395
    }
H
Haojun Liao 已提交
396

397
    // todo: use list instead of array?
398 399
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
400
      goto _end;
401
    }
H
Haojun Liao 已提交
402

403 404 405
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
406

H
Haojun Liao 已提交
407
      int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
408
      if (code != TSDB_CODE_SUCCESS) {
409
        goto _end;
410
      }
411

412 413
      taosArrayPush(pReadHandle->pColumns, &colInfo);
      pReadHandle->statis[i].colId = colInfo.info.colId;
B
Bomin Zhang 已提交
414
    }
H
Haojun Liao 已提交
415

416
    pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
H
Haojun Liao 已提交
417
  }
418

419 420
  pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
421
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
422
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
423
    goto _end;
H
hjxilinx 已提交
424
  }
425

426 427
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
428

H
Haojun Liao 已提交
429
  return (tsdbReaderT)pReadHandle;
430

431
  _end:
432
  tsdbCleanupReadHandle(pReadHandle);
433
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
434
  return NULL;
H
hjxilinx 已提交
435 436
}

H
Haojun Liao 已提交
437
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
438
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
439
  if (pTsdbReadHandle == NULL) {
440 441 442
    return NULL;
  }

443
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
H
Haojun Liao 已提交
444
    return (tsdbReaderT*) pTsdbReadHandle;
445
  }
H
Haojun Liao 已提交
446 447

  // todo apply the lastkey of table check to avoid to load header file
448
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
449
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
450
//    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
451 452 453 454
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

455 456 457
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo),
      taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr);

H
Haojun Liao 已提交
458
  return (tsdbReaderT) pTsdbReadHandle;
H
Haojun Liao 已提交
459 460
}

H
Haojun Liao 已提交
461
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond *pCond) {
462
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
463

464 465 466
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
dengyihao's avatar
dengyihao 已提交
467
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, int64_t);
468 469 470 471 472
    }

    return;
  }

473 474 475 476 477 478 479 480 481
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
482 483

  if (ASCENDING_TRAVERSE(pCond->order)) {
484
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
485
  } else {
486
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
487 488 489
  }

  // allocate buffer in order to load data blocks from file
490
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
491

492 493
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
494

495
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
496 497
}

H
Haojun Liao 已提交
498
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
499
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
500

501 502 503 504 505 506 507 508 509
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
510 511

  if (ASCENDING_TRAVERSE(pCond->order)) {
512
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
513
  } else {
514
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
515 516 517
  }

  // allocate buffer in order to load data blocks from file
518
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
519

520 521
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
522

H
Haojun Liao 已提交
523
  SArray* pTable = NULL;
524
//  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
525

526
//  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
527

528 529
  pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
530
//    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
531 532
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
533

534 535
//  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
536 537
}

H
Haojun Liao 已提交
538
tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
539
  pCond->twindow = updateLastrowForEachGroup(groupList);
H
Haojun Liao 已提交
540 541 542 543 544 545

  // no qualified table
  if (groupList->numOfTables == 0) {
    return NULL;
  }

H
Haojun Liao 已提交
546
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
547
  if (pTsdbReadHandle == NULL) {
548 549 550
    return NULL;
  }

551
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList);
H
Haojun Liao 已提交
552 553 554 555
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
556 557

  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
558 559
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
560 561
  }
  
562
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
563 564
}

565
#if 0
H
Haojun Liao 已提交
566
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
567 568
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
  if (pTsdbReadHandle == NULL) {
569 570 571
    return NULL;
  }

572
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
573 574 575 576 577
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

578 579
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
580
  }
D
init  
dapan1121 已提交
581
  
582
  return pTsdbReadHandle;
H
hjxilinx 已提交
583 584
}

585
#endif
H
Haojun Liao 已提交
586
SArray* tsdbGetQueriedTableList(tsdbReaderT *pHandle) {
587
  assert(pHandle != NULL);
H
Haojun Liao 已提交
588

589
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
590

591
  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
592
  SArray* res = taosArrayInit(size, POINTER_BYTES);
593 594 595
  return res;
}

H
Haojun Liao 已提交
596 597 598 599 600
// leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
  assert(pGroupList);
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

wafwerar's avatar
wafwerar 已提交
601
  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
Y
yihaoDeng 已提交
602
  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
H
Haojun Liao 已提交
603 604 605 606 607 608 609 610

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
    size_t numOfTables = taosArrayGetSize(oneGroup);

    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
    for (int32_t j = 0; j < numOfTables; ++j) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
611 612 613 614 615
//      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//        taosArrayPush(px, pInfo);
//        pNew->numOfTables += 1;
//        break;
//      }
H
Haojun Liao 已提交
616 617 618 619 620 621 622 623 624 625 626 627 628
    }

    // there are no data in this group
    if (taosArrayGetSize(px) == 0) {
      taosArrayDestroy(px);
    } else {
      taosArrayPush(pNew->pGroupList, &px);
    }
  }

  return pNew;
}

H
Haojun Liao 已提交
629
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
630 631
  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);

632 633 634 635 636 637 638 639 640 641 642 643
  if (pNew->numOfTables == 0) {
    tsdbDebug("update query time range to invalidate time window");

    assert(taosArrayGetSize(pNew->pGroupList) == 0);
    bool asc = ASCENDING_TRAVERSE(pCond->order);
    if (asc) {
      pCond->twindow.ekey = pCond->twindow.skey - 1;
    } else {
      pCond->twindow.skey = pCond->twindow.ekey - 1;
    }
  }

H
Haojun Liao 已提交
644
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
645 646
  pTsdbReadHandle->loadExternalRow = true;
  pTsdbReadHandle->currentLoadExternalRows = true;
647

648
  return pTsdbReadHandle;
649 650
}

651
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
652
  if (pCheckInfo->initBuf) {
653 654
    return true;
  }
H
Haojun Liao 已提交
655

656
  pCheckInfo->initBuf = true;
657
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
658

659 660 661
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

H
Haojun Liao 已提交
662
  TSKEY tLastKey = 0;  /// keyToTkey(pCheckInfo->lastKey);
663 664 665
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
666
      pCheckInfo->iter =
667
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
668
    }
669
  }
H
Haojun Liao 已提交
670

671 672 673
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
674
      pCheckInfo->iiter =
675
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
676
    }
677
  }
H
Haojun Liao 已提交
678

679 680 681 682
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
683

684 685 686 687 688
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
H
Haojun Liao 已提交
689

690 691 692
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
693

H
Haojun Liao 已提交
694 695
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
696
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
697 698
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
699 700 701 702 703 704 705

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

706
  } else {
H
Haojun Liao 已提交
707
    tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
708
  }
H
Haojun Liao 已提交
709

710 711 712
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
713

H
Haojun Liao 已提交
714 715
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
716
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
717 718
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
719 720 721 722 723 724

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
725
  } else {
H
Haojun Liao 已提交
726
    tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
727
  }
H
Haojun Liao 已提交
728

729 730 731
  return true;
}

H
Haojun Liao 已提交
732 733 734 735 736
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

737
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
H
Haojun Liao 已提交
738
  STSRow *rmem = NULL, *rimem = NULL;
739 740 741
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
742
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
743 744 745 746 747 748
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
749
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
750 751 752 753 754 755 756 757 758
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
759
    return TD_ROW_KEY(rmem);
760 761 762 763
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
764
    return TD_ROW_KEY(rimem);
765 766
  }

H
Haojun Liao 已提交
767 768
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791

  if (r1 == r2) {
    if(update == TD_ROW_DISCARD_UPDATE){
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
    }
    else if(update == TD_ROW_OVERWRITE_UPDATE) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
  }
  else {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

H
Haojun Liao 已提交
792 793
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) {
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
794 795 796
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
797
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
798 799
    }
  }
800

H
Haojun Liao 已提交
801 802 803
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
804
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
805 806
    }
  }
807

H
Haojun Liao 已提交
808 809
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
810
  }
811

H
Haojun Liao 已提交
812
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
813 814 815
    pCheckInfo->chosen = 0;
    return rmem;
  }
816

H
Haojun Liao 已提交
817
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
818 819 820
    pCheckInfo->chosen = 1;
    return rimem;
  }
821

H
Haojun Liao 已提交
822 823
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
824

825 826
  if (r1 == r2) {
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
827
      tSkipListIterNext(pCheckInfo->iter);
828
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
829
      return rimem;
830
    } else if(update == TD_ROW_OVERWRITE_UPDATE){
H
TD-1439  
Hongze Cheng 已提交
831
      tSkipListIterNext(pCheckInfo->iiter);
832 833 834 835
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
836
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
837 838
      return rmem;
    }
H
Haojun Liao 已提交
839 840 841
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
842
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
843 844
        return rmem;
      } else {
845
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
846 847 848 849
        return rimem;
      }
    } else {
      if (r1 < r2) {
850
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
851 852
        return rimem;
      } else {
853
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
854 855 856 857
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
858 859
}

860
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
861
  bool hasNext = false;
862
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
863 864 865
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
866

H
Haojun Liao 已提交
867 868 869
    if (hasNext) {
      return hasNext;
    }
870

H
Haojun Liao 已提交
871 872 873
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
874
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){
875 876 877
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
878

879 880 881
    if (hasNext) {
      return hasNext;
    }
882

883 884
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
885
    }
886 887 888 889 890 891 892
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
893
  }
894

H
Haojun Liao 已提交
895 896 897
  return hasNext;
}

898
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
TD-1439  
Hongze Cheng 已提交
899
  STsdbCfg *pCfg = &pHandle->pTsdb->config;
900 901
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
902
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
903

904
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
905 906 907 908
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

H
Haojun Liao 已提交
909
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
910
  if (row == NULL) {
911 912
    return false;
  }
913

H
Haojun Liao 已提交
914
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
H
Haojun Liao 已提交
915 916
  tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
      pCheckInfo->tableId,  pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
917

918
  // all data in mem are checked already.
919 920
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
921 922
    return false;
  }
H
Haojun Liao 已提交
923

924 925
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
926
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
927

928 929 930 931
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
932

933
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
dengyihao's avatar
dengyihao 已提交
934
    TSWAP(win->skey, win->ekey, TSKEY);
935
  }
H
Haojun Liao 已提交
936

937
  return true;
938
}
H
hjxilinx 已提交
939

940 941
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
942 943 944
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
945

D
dapan1121 已提交
946
  if (key < 0) {
947
    key -= (daysPerFile * tsTickPerDay[precision]);
D
dapan1121 已提交
948 949
  }
  
950
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision]));  // set the starting fileId
951 952 953
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
954

955
  if (fid > 0L && fid > INT32_MAX) {
956 957
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
958

S
TD-1057  
Shengliang Guan 已提交
959
  return (int32_t)fid;
960 961
}

H
refact  
Hongze Cheng 已提交
962
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
963 964
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
965

966
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
967

968 969 970
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
971

972
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
973

974 975 976 977 978 979 980 981 982 983 984
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
985

986 987
  return midSlot;
}
988

989
static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
990
  int32_t code = 0;
H
Haojun Liao 已提交
991

992
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
993
  pCheckInfo->numOfBlocks = 0;
994

H
Haojun Liao 已提交
995 996 997 998
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
  table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
999 1000 1001
    code = terrno;
    return code;
  }
1002

1003
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1004

H
Haojun Liao 已提交
1005
  // no data block in this file, try next file
1006
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1007 1008
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1009

H
Haojun Liao 已提交
1010 1011 1012
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1013
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1014 1015 1016 1017
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1018 1019
    }

H
Haojun Liao 已提交
1020 1021 1022
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1023

1024
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1025 1026
    return terrno;
  }
H
Haojun Liao 已提交
1027
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1028

H
Haojun Liao 已提交
1029
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1030

1031 1032
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1033
  } else {
1034
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1035
  }
1036

dengyihao's avatar
dengyihao 已提交
1037 1038
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1039

H
Haojun Liao 已提交
1040 1041 1042
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1043

H
Haojun Liao 已提交
1044 1045 1046
  if (s > pCompInfo->blocks[start].keyLast) {
    return 0;
  }
1047

H
Haojun Liao 已提交
1048 1049 1050 1051
  // todo speedup the procedure of located end block
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
    end += 1;
  }
1052

H
Haojun Liao 已提交
1053
  pCheckInfo->numOfBlocks = (end - start);
1054

H
Haojun Liao 已提交
1055 1056 1057
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1058

H
Haojun Liao 已提交
1059 1060 1061
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1062

1063
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1064 1065 1066 1067
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1068
  pTsdbReadHandle->cost.headFileLoad += 1;
1069 1070
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1071
  size_t numOfTables = 0;
1072 1073 1074 1075
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1076

H
Haojun Liao 已提交
1077
    for (int32_t i = 0; i < numOfTables; ++i) {
1078
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1079
      if (code != TSDB_CODE_SUCCESS) {
1080 1081
        int64_t e = taosGetTimestampUs();

1082
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1083 1084 1085 1086 1087
        return code;
      }
    }
  } else {
    assert(0);
1088
  }
1089

1090
  int64_t e = taosGetTimestampUs();
1091
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1092
  return code;
1093 1094
}

1095
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
H
Haojun Liao 已提交
1096
  int64_t st = taosGetTimestampUs();
1097

H
Haojun Liao 已提交
1098
  STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
1099
  int32_t   code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
H
Haojun Liao 已提交
1100
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1101
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1102 1103 1104 1105
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1106
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema);
H
Haojun Liao 已提交
1107
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1108
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1109 1110 1111 1112
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1113
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema);
H
Haojun Liao 已提交
1114
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1115
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1116 1117 1118
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1119

1120
  int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
H
Haojun Liao 已提交
1121

1122
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)));
H
Haojun Liao 已提交
1123
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1124 1125 1126
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1127
  }
1128

1129
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1130

1131 1132
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1133
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1134

1135
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1136
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1137

1138
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1139

1140
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
1141
  if(pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1142 1143 1144 1145 1146 1147
    int64_t* src = pCols->cols[0].pData;
    for(int32_t i = 0; i < pBlock->numOfRows; ++i) {
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1148
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1149
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1150

H
Haojun Liao 已提交
1151 1152
  tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s",
      pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1153
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1154 1155 1156 1157

_error:
  pBlock->numOfRows = 0;

H
Haojun Liao 已提交
1158 1159
  tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s",
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1160
  return terrno;
H
hjxilinx 已提交
1161 1162
}

1163 1164 1165 1166 1167
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
1168

1169 1170 1171
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1172
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1173
  TSKEY          key;
H
Haojun Liao 已提交
1174
  int32_t code = TSDB_CODE_SUCCESS;
1175

1176
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1177 1178
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

1179
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
1180

H
Haojun Liao 已提交
1181
  if (key != TSKEY_INITIAL_VAL) {
H
Haojun Liao 已提交
1182
    tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1183
  } else {
H
Haojun Liao 已提交
1184
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1185
  }
H
Haojun Liao 已提交
1186

1187 1188
  if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
1189

1190 1191
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
1192

H
Haojun Liao 已提交
1193
      // do not load file block into buffer
1194
      int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
1195

1196 1197 1198
      TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step);
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1199 1200 1201

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1202
      if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1203
        TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
H
Haojun Liao 已提交
1204
      }
H
Haojun Liao 已提交
1205

H
Haojun Liao 已提交
1206 1207
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1208
      return code;
H
Haojun Liao 已提交
1209
    }
H
Haojun Liao 已提交
1210

1211

1212
    // return error, add test cases
1213
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1214
      return code;
1215 1216
    }

1217
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1218
  } else {
1219 1220 1221 1222 1223 1224
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1225 1226
    assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1227

1228 1229 1230
    if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1231 1232 1233 1234

      cur->rows = binfo.rows;
      cur->win  = binfo.window;
      cur->mixBlock = false;
H
Haojun Liao 已提交
1235 1236
      cur->blockCompleted = true;

1237
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
H
Haojun Liao 已提交
1238 1239 1240 1241 1242 1243
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
H
Haojun Liao 已提交
1244
    } else { // partially copy to dest buffer
1245
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1246 1247
      cur->mixBlock = true;
    }
1248

H
Haojun Liao 已提交
1249
    assert(cur->blockCompleted);
H
Haojun Liao 已提交
1250
    if (cur->rows == binfo.rows) {
H
Haojun Liao 已提交
1251 1252
      tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1253
    } else {
H
Haojun Liao 已提交
1254 1255
      tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1256 1257
    }

1258
  }
H
Haojun Liao 已提交
1259 1260

  return code;
1261 1262
}

1263 1264
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1265
  int32_t code = TSDB_CODE_SUCCESS;
1266
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1267

1268
  if (asc) {
H
Haojun Liao 已提交
1269
    // query ended in/started from current block
1270 1271
    if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1272 1273
        *exists = false;
        return code;
1274
      }
1275

1276
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1277
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1278

1279 1280
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
1281
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1282 1283 1284
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1285

H
Haojun Liao 已提交
1286
      assert(pCheckInfo->lastKey <= pBlock->keyLast);
1287
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1288
    } else {  // the whole block is loaded in to buffer
1289
      cur->pos = asc? 0:(pBlock->numOfRows - 1);
1290
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1291
    }
1292
  } else {  //desc order, query ended in current block
1293 1294
    if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1295 1296
        *exists = false;
        return code;
1297
      }
H
Haojun Liao 已提交
1298

1299
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
1300
      if (pCheckInfo->lastKey < pBlock->keyLast) {
1301
        cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1302
      } else {
H
Haojun Liao 已提交
1303
        cur->pos = pBlock->numOfRows - 1;
1304
      }
H
Haojun Liao 已提交
1305

H
Haojun Liao 已提交
1306
      assert(pCheckInfo->lastKey >= pBlock->keyFirst);
1307
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1308
    } else {
1309
      cur->pos = asc? 0:(pBlock->numOfRows-1);
1310
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1311
    }
1312
  }
1313

1314
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1315
  return code;
H
[td-32]  
hjxilinx 已提交
1316 1317
}

1318
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1319
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1320
  int    numOfRows;
1321 1322
  TSKEY* keyList;

1323
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1324

1325
  if (num <= 0) return -1;
1326 1327

  keyList = (TSKEY*)pValue;
1328 1329
  firstPos = 0;
  lastPos = num - 1;
1330

1331
  if (order == TSDB_ORDER_DESC) {
1332 1333 1334 1335 1336
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1337

H
Haojun Liao 已提交
1338 1339
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1340

1341 1342 1343 1344 1345 1346 1347 1348
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1349

1350 1351 1352 1353 1354
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1355

1356 1357 1358 1359 1360 1361 1362
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1363

H
Haojun Liao 已提交
1364 1365
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1366

1367 1368 1369 1370 1371 1372 1373 1374 1375
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1376

1377 1378 1379
  return midPos;
}

1380 1381
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
1382

1383
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1384
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1385

1386
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1387 1388 1389 1390 1391 1392
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1393
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1394

1395
  //data in buffer has greater timestamp, copy data in file block
1396 1397
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
1398
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1399 1400 1401 1402 1403 1404 1405

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1406
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1407 1408 1409 1410 1411 1412 1413
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
//        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
        for(int32_t k = start; k < num + start; ++k) {
          SCellVal sVal = {0};
          if (tdGetColDataOfRow(&sVal, src, k) < 0) {
            TASSERT(0);
          }
1414

1415 1416 1417 1418 1419 1420 1421
          if (sVal.valType == TD_VTYPE_NULL) {
            colDataAppend(pColInfo, k, NULL, true);
          } else {
            colDataAppend(pColInfo, k, sVal.val, false);
          }
        }
      } else {  // handle the var-string
1422 1423
        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
1424
          SCellVal sVal = {0};
H
Haojun Liao 已提交
1425 1426 1427
          if(tdGetColDataOfRow(&sVal, src, k) < 0){
            TASSERT(0);
          }
1428 1429

          colDataAppend(pColInfo, k, sVal.val, false);
1430 1431
        }
      }
1432 1433 1434 1435

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
1436 1437
      for(int32_t k = start; k < num + start; ++k) {  // TODO opt performance
        colDataAppend(pColInfo, k, NULL, true);
1438 1439
      }
      i++;
1440 1441
    }
  }
1442 1443

  while (i < requiredNumOfCols) { // the remain columns are all null data
1444
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1445 1446
    for(int32_t k = start; k < num + start; ++k) {
      colDataAppend(pColInfo, k, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
1447 1448
    }
    i++;
1449
  }
H
Haojun Liao 已提交
1450

1451 1452
  pTsdbReadHandle->cur.win.ekey = tsArray[end];
  pTsdbReadHandle->cur.lastKey = tsArray[end] + step;
1453

1454
  return numOfRows + num;
1455 1456
}

H
Haojun Liao 已提交
1457
// TODO fix bug for reverse copy data problem
1458
// Note: row1 always has high priority
H
Haojun Liao 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
                               STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                               bool forceSetNull) {
#if 1
  STSchema*   pSchema;
  STSRow*     row;
  int16_t     colId;
  int16_t     offset;

  bool isRow1DataRow = TD_IS_TP_ROW(row1);
1469 1470 1471
  bool isRow2DataRow;
  bool isChosenRowDataRow;
  int32_t chosen_itr;
H
Haojun Liao 已提交
1472
  SCellVal sVal = {0};
1473

H
Haojun Liao 已提交
1474
  // the schema version info is embeded in STSRow
1475 1476 1477
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
H
Haojun Liao 已提交
1478
    pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1));
1479
  }
1480

1481 1482
  if(isRow1DataRow) {
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1483
  } else {
H
Haojun Liao 已提交
1484
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1485
  }
1486

1487 1488
  int32_t numOfColsOfRow2 = 0;
  if(row2) {
H
Haojun Liao 已提交
1489
    isRow2DataRow = TD_IS_TP_ROW(row2);
1490
    if (pSchema2 == NULL) {
H
Haojun Liao 已提交
1491
      pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2));
1492 1493 1494 1495
    }
    if(isRow2DataRow) {
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1496
      numOfColsOfRow2 = tdRowGetNCols(row2);
1497 1498
    }
  }
C
Cary Xu 已提交
1499

1500 1501
  int32_t i = 0, j = 0, k = 0;
  while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1502
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1503 1504 1505 1506 1507 1508 1509

    int32_t colIdOfRow1;
    if(j >= numOfColsOfRow1) {
      colIdOfRow1 = INT32_MAX;
    } else if(isRow1DataRow) {
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
H
Haojun Liao 已提交
1510
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row1, j);
1511 1512 1513 1514 1515 1516 1517 1518 1519
      colIdOfRow1 = pColIdx->colId;
    }

    int32_t colIdOfRow2;
    if(k >= numOfColsOfRow2) {
      colIdOfRow2 = INT32_MAX;
    } else if(isRow2DataRow) {
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
H
Haojun Liao 已提交
1520
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row2, k);
1521 1522 1523 1524 1525
      colIdOfRow2 = pColIdx->colId;
    }

    if(colIdOfRow1 == colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1526
        j++;
1527
        k++;
C
Cary Xu 已提交
1528 1529
        continue;
      }
1530 1531 1532 1533 1534 1535 1536 1537
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else if(colIdOfRow1 < colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
        j++;
        continue;
C
Cary Xu 已提交
1538
      }
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
      if(colIdOfRow2 < pColInfo->info.colId) {
        k++;
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
    if(isChosenRowDataRow) {
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
H
Haojun Liao 已提交
1556
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal);
1557
    } else {
H
Haojun Liao 已提交
1558
      SKvRowIdx *pColIdx = tdKvRowColIdxAt(row, chosen_itr);
1559 1560
      colId = pColIdx->colId;
      offset = pColIdx->offset;
H
Haojun Liao 已提交
1561
      tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal);
1562 1563 1564
    }

    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1565
      if (tdValTypeIsNorm(sVal.valType)) {
H
Haojun Liao 已提交
1566
        colDataAppend(pColInfo, numOfRows, sVal.val, false);
H
Haojun Liao 已提交
1567
      } else if (forceSetNull) {
H
Haojun Liao 已提交
1568
        colDataAppend(pColInfo, numOfRows, NULL, true);
1569
      }
H
Haojun Liao 已提交
1570

1571
      i++;
C
Cary Xu 已提交
1572

1573
      if(row == row1) {
C
Cary Xu 已提交
1574
        j++;
1575 1576 1577 1578 1579
      } else {
        k++;
      }
    } else {
      if(forceSetNull) {
H
Haojun Liao 已提交
1580
        colDataAppend(pColInfo, numOfRows, NULL, true);
C
Cary Xu 已提交
1581
      }
1582
      i++;
1583
    }
1584
  }
1585

1586 1587
  if(forceSetNull) {
    while (i < numOfCols) { // the remain columns are all null data
1588
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
1589
      colDataAppend(pColInfo, numOfRows, NULL, true);
1590
      i++;
1591 1592
    }
  }
H
Haojun Liao 已提交
1593
#endif
1594
}
1595

1596 1597
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
  if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1598 1599 1600 1601
    return;
  }

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1602 1603
  if (numOfRows < pTsdbReadHandle->outputCapacity) {
    int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
1604
    for(int32_t i = 0; i < numOfCols; ++i) {
1605
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
1606
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
1607 1608 1609 1610
    }
  }
}

1611
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
1612
                                int32_t* start, int32_t* end) {
1613 1614
  *start = -1;

1615
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1616
    int32_t remain = endPos - startPos + 1;
1617 1618
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1619 1620
    } else {
      *end = endPos;
1621 1622 1623 1624 1625
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1626 1627
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1628 1629
    } else {
      *end = endPos;
1630 1631 1632 1633 1634 1635 1636
    }

    *start = *end;
    *end = startPos;
  }
}

1637 1638
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1639 1640

  pCheckInfo->lastKey = cur->lastKey;
1641
  pTsdbReadHandle->realNumOfRows = numOfRows;
1642 1643 1644 1645
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1646 1647
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1648 1649

  if (cur->rows > 0) {
1650 1651
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1652
    } else {
1653
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1654 1655
    }

1656
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
H
Haojun Liao 已提交
1657 1658
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
  } else {
1659
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1660

1661 1662
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1663 1664 1665
  }
}

1666 1667
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1668

1669
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1670 1671
  TSKEY* tsArray = pCols->cols[0].pData;

1672 1673
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
H
Haojun Liao 已提交
1674 1675 1676 1677 1678 1679

  int32_t pos = cur->pos;

  int32_t start = cur->pos;
  int32_t end = endPos;

1680
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1681
    TSWAP(start, end, int32_t);
H
Haojun Liao 已提交
1682 1683
  }

1684 1685
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1686 1687 1688

  // the time window should always be ascending order: skey <= ekey
  cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1689
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
H
Haojun Liao 已提交
1690
  cur->lastKey = tsArray[endPos] + step;
H
Haojun Liao 已提交
1691
  cur->blockCompleted = true;
H
Haojun Liao 已提交
1692 1693

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1694
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
H
Haojun Liao 已提交
1695 1696 1697

  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
  pos = endPos + step;
1698 1699
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1700

H
Haojun Liao 已提交
1701 1702
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1703 1704
}

1705
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1706 1707
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1708
  int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1709

1710 1711
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1712

1713
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
H
Haojun Liao 已提交
1714 1715
    endPos = pBlockInfo->rows - 1;
    cur->mixBlock = (cur->pos != 0);
1716
  } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
H
Haojun Liao 已提交
1717 1718 1719 1720
    endPos = 0;
    cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
  } else {
    assert(pCols->numOfRows > 0);
1721
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
H
Haojun Liao 已提交
1722 1723 1724 1725 1726 1727
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1728 1729
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1730 1731 1732 1733
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataBlockInfo blockInfo = {0};//GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1734

1735
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1736

1737 1738
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
H
Haojun Liao 已提交
1739 1740
      cur->pos >= 0 && cur->pos < pBlock->numOfRows);

1741
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1742
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
1743 1744

  // for search the endPos, so the order needs to reverse
1745
  int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
1746

1747 1748
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
1749

H
Haojun Liao 已提交
1750
  STable* pTable = NULL;
1751
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1752

1753
  tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
H
Haojun Liao 已提交
1754
            "end:%d, %s",
1755
            pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey,
H
Haojun Liao 已提交
1756
            blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1757

1758 1759
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
H
Haojun Liao 已提交
1760

1761 1762 1763 1764
  int16_t rv1 = -1;
  int16_t rv2 = -1;
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1765

H
Haojun Liao 已提交
1766 1767
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
1768

1769 1770
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
1771
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
1772
    return;
1773
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1774 1775
    SSkipListNode* node = NULL;
    do {
H
Haojun Liao 已提交
1776 1777
      STSRow* row2 = NULL;
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
1778
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
1779
        break;
1780
      }
1781

H
Haojun Liao 已提交
1782
      TSKEY key = TD_ROW_KEY(row1);
1783 1784
      if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1785 1786 1787
        break;
      }

1788 1789
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1790 1791 1792
        break;
      }

1793 1794
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
1795
        if (rv1 != TD_ROW_SVER(row1)) {
1796
//          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1797
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
1798
        }
H
Haojun Liao 已提交
1799
        if(row2 && rv2 != TD_ROW_SVER(row2)) {
1800
//          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1801
          rv2 = TD_ROW_SVER(row2);
1802 1803
        }
        
H
Haojun Liao 已提交
1804
        mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
1805 1806 1807 1808
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1809

1810
        cur->win.ekey = key;
1811 1812 1813
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1814
        moveToNextRowInMem(pCheckInfo);
1815
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
H
TD-1439  
Hongze Cheng 已提交
1816
        if (pCfg->update) {
1817
          if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
1818
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
1819
          }
H
Haojun Liao 已提交
1820
          if (rv1 != TD_ROW_SVER(row1)) {
1821
//            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1822
            rv1 = TD_ROW_SVER(row1);
1823
          }
H
Haojun Liao 已提交
1824
          if(row2 && rv2 != TD_ROW_SVER(row2)) {
1825
//            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1826
            rv2 = TD_ROW_SVER(row2);
1827 1828 1829
          }
          
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
H
Haojun Liao 已提交
1830
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
H
TD-1439  
Hongze Cheng 已提交
1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
1845 1846
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1847 1848 1849
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1850

1851
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
1852 1853
        assert(end != -1);

H
Haojun Liao 已提交
1854
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
1855
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
1856 1857 1858 1859
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
1860
        }
1861

1862
        int32_t qstart = 0, qend = 0;
1863
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
1864

1865
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
1866 1867
        pos += (qend - qstart + 1) * step;

1868
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[qend]:tsArray[qstart];
1869
        cur->lastKey  = cur->win.ekey + step;
1870
      }
1871
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
1872

1873
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
1874 1875 1876 1877
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1878
      if (node == NULL ||
H
Haojun Liao 已提交
1879
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
1880
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
H
Haojun Liao 已提交
1881
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
1882
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1883 1884 1885 1886 1887
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

1888
        int32_t start = -1, end = -1;
1889
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
1890

1891
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
1892
        pos += (end - start + 1) * step;
1893

1894
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[end]:tsArray[start];
1895
        cur->lastKey  = cur->win.ekey + step;
H
Haojun Liao 已提交
1896
        cur->mixBlock = true;
1897
      }
1898 1899
    }
  }
H
Haojun Liao 已提交
1900 1901

  cur->blockCompleted =
1902 1903
      (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
       ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
1904

1905
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1906
    TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
1907
  }
1908

1909 1910 1911
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1912

H
Haojun Liao 已提交
1913 1914
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
      pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
1915 1916
}

1917
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
1918
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1919
  int    numOfRows;
1920 1921
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
1922
  if (num <= 0) return -1;
1923 1924

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
1925 1926
  firstPos = 0;
  lastPos = num - 1;
1927

1928
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
1929 1930 1931 1932 1933
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1934

H
Haojun Liao 已提交
1935 1936
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1937

H
[td-32]  
hjxilinx 已提交
1938 1939 1940 1941 1942 1943 1944 1945
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1946

H
[td-32]  
hjxilinx 已提交
1947 1948 1949 1950 1951
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1952

H
[td-32]  
hjxilinx 已提交
1953 1954 1955 1956 1957 1958 1959
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1960

H
Haojun Liao 已提交
1961 1962
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1963

H
[td-32]  
hjxilinx 已提交
1964 1965 1966 1967 1968 1969 1970 1971 1972
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1973

H
[td-32]  
hjxilinx 已提交
1974 1975 1976
  return midPos;
}

1977
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
1978 1979
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
1980 1981

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
1982
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
1983
    taosMemoryFreeClear(pBlockInfo);
1984 1985
  }

wafwerar's avatar
wafwerar 已提交
1986
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

1998
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
1999 2000
    /* left block is empty */
    return 1;
2001
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2002 2003 2004 2005 2006 2007 2008
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2009
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
2010
#if 0	// TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2011 2012
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2013
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2014
  }
H
Haojun Liao 已提交
2015
#endif
2016

H
Haojun Liao 已提交
2017
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2018 2019
}

2020
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2021 2022
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2023 2024
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2025
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2026 2027 2028 2029
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

2030
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
2031 2032
  }

2033
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2034 2035
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2036
  // access data blocks according to the offset of each block in asc/desc order.
2037
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2038

2039 2040
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2041 2042 2043
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2044

2045
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2046
    cleanBlockOrderSupporter(&sup, 0);
2047
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2048
  }
H
Haojun Liao 已提交
2049

2050
  int32_t cnt = 0;
2051
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2052

2053
  for (int32_t j = 0; j < numOfTables; ++j) {
2054
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2055 2056 2057
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2058

H
refact  
Hongze Cheng 已提交
2059
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2060
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2061

wafwerar's avatar
wafwerar 已提交
2062
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2063
    if (buf == NULL) {
2064
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2065
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2066 2067
    }

2068
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2069 2070

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2071
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2072

H
Haojun Liao 已提交
2073 2074
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2075 2076 2077
      cnt++;
    }

2078
    numOfQualTables++;
2079 2080
  }

H
Haojun Liao 已提交
2081
  assert(numOfBlocks == cnt);
2082

H
Haojun Liao 已提交
2083 2084
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2085
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2086
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2087

H
Haojun Liao 已提交
2088 2089
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
        pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2090 2091
    return TSDB_CODE_SUCCESS;
  }
2092

H
Haojun Liao 已提交
2093 2094
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
      numOfQualTables, pTsdbReadHandle->idStr);
2095

2096
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2097
  sup.numOfTables = numOfQualTables;
2098

2099 2100
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2101 2102
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2103
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2104 2105 2106 2107 2108
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2109
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2110 2111
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2112
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2113
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2114 2115

    // set data block index overflow, in order to disable the offset comparator
2116 2117
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2118
    }
2119

H
Haojun Liao 已提交
2120
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2121 2122 2123 2124 2125
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2126
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2127 2128 2129
   * }
   */

H
Haojun Liao 已提交
2130
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2131
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2132
  taosMemoryFree(pTree);
2133 2134 2135 2136

  return TSDB_CODE_SUCCESS;
}

2137
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2138

2139 2140 2141
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool *exists) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2142 2143

  while(1) {
2144
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2145 2146 2147 2148
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2149 2150
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2151
      // all data blocks in current file has been checked already, try next file if exists
2152
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2153 2154 2155 2156
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2157
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2158 2159 2160 2161
    }
  }
}

2162 2163 2164
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2165 2166 2167

  int32_t code = TSDB_CODE_SUCCESS;

2168
  int32_t numOfBlocks = 0;
2169
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2170

2171
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
2172 2173
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Hongze Cheng 已提交
2174
  while (true) {
2175
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2176

2177 2178
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2179 2180 2181
      break;
    }

2182
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2183 2184

    // current file are not overlapped with query time window, ignore remain files
2185 2186 2187
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2188 2189
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2190 2191
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2192 2193 2194
      break;
    }

2195 2196
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2197 2198 2199 2200
      code = terrno;
      break;
    }

2201
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2202

2203
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2204 2205 2206 2207
      code = terrno;
      break;
    }

2208
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2209 2210
      break;
    }
H
Haojun Liao 已提交
2211

H
Haojun Liao 已提交
2212 2213
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2214

2215 2216 2217 2218
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2219

2220
    // todo return error code to query engine
2221
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) {
2222 2223
      break;
    }
H
Haojun Liao 已提交
2224

2225 2226
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2227 2228 2229
      break;
    }
  }
H
Haojun Liao 已提交
2230

2231
  // no data in file anymore
2232
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2233
    if (code == TSDB_CODE_SUCCESS) {
2234
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2235 2236
    }

D
dapan1121 已提交
2237
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2238 2239
    *exists = false;
    return code;
2240
  }
H
Haojun Liao 已提交
2241

2242 2243 2244
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 0:pTsdbReadHandle->numOfBlocks-1;
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2245

2246 2247
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
  return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2248 2249 2250 2251 2252 2253 2254
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2255 2256
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
2257

2258 2259
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2260 2261 2262 2263

  cur->slot += step;
  cur->mixBlock       = false;
  cur->blockCompleted = false;
2264
}
H
Haojun Liao 已提交
2265 2266

int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
2267
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
H
Haojun Liao 已提交
2268

H
Haojun Liao 已提交
2269
  pTableBlockInfo->totalSize = 0;
2270
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2271

2272
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2273 2274

  // find the start data block in file
2275 2276 2277
  pTsdbReadHandle->locateStart = true;
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
  int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2278 2279

  tsdbRLockFS(pFileHandle);
2280 2281
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2282 2283
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2284
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2285

H
Haojun Liao 已提交
2286
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2287
  int32_t     numOfBlocks = 0;
2288
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2289
  int         defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2290 2291
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Haojun Liao 已提交
2292 2293
  bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

H
Haojun Liao 已提交
2294 2295
  while (true) {
    numOfBlocks = 0;
2296
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2297

2298 2299
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2300 2301 2302
      break;
    }

2303
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2304 2305

    // current file are not overlapped with query time window, ignore remain files
H
Haojun Liao 已提交
2306
    if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
2307
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2308 2309
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2310
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2311 2312 2313
      break;
    }

H
Haojun Liao 已提交
2314
    pTableBlockInfo->numOfFiles += 1;
2315 2316
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2317 2318 2319 2320
      code = terrno;
      break;
    }

2321
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2322

2323
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2324 2325 2326 2327
      code = terrno;
      break;
    }

2328
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2329 2330 2331
      break;
    }

H
Haojun Liao 已提交
2332 2333
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2334 2335 2336 2337 2338 2339

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2340
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2341 2342 2343

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2344
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2345

H
Haojun Liao 已提交
2346
        int32_t numOfRows = pBlock[j].numOfRows;
2347
        pTableBlockInfo->totalRows += numOfRows;
H
Haojun Liao 已提交
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
H
Haojun Liao 已提交
2359 2360 2361
//        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
//        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
//        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2362 2363 2364 2365 2366 2367 2368
      }
    }
  }

  return code;
}

2369 2370 2371
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2372 2373

  // find the start data block in file
2374 2375 2376 2377
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
    STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
    int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2378

H
Hongze Cheng 已提交
2379
    tsdbRLockFS(pFileHandle);
2380 2381
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2382
    tsdbUnLockFS(pFileHandle);
2383

2384
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2385
  } else {
2386
    // check if current file block is all consumed
2387
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2388
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2389

2390
    // current block is done, try next
H
Haojun Liao 已提交
2391
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2392
      // all data blocks in current file has been checked already, try next file if exists
2393
    } else {
H
Haojun Liao 已提交
2394 2395
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2396 2397
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2398

H
Haojun Liao 已提交
2399 2400 2401 2402 2403 2404 2405
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2406 2407
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2408
    } else {
2409 2410 2411
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
      return getDataBlockRv(pTsdbReadHandle, pNext, exists);
2412 2413
    }
  }
2414 2415
}

2416 2417
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2418
  
2419 2420
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2421 2422
      return true;
    }
H
Haojun Liao 已提交
2423

2424
    pTsdbReadHandle->activeIndex += 1;
2425
  }
H
Haojun Liao 已提交
2426

2427 2428 2429
  return false;
}

2430
//todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2431
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2432
  // filter the queried time stamp in the first place
2433
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
2434 2435

  // starts from the buffer in case of descending timestamp order check data blocks
2436
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2437 2438 2439

  int32_t i = 0;
  while(i < numOfTables) {
2440
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2441 2442

    // the first qualified table for interpolation query
2443 2444 2445 2446
//    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//      break;
//    }
H
Haojun Liao 已提交
2447 2448 2449 2450 2451 2452 2453 2454 2455

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

2456 2457
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2458

2459 2460
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2461 2462 2463
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2464
                                 STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2465
  int     numOfRows = 0;
2466 2467
  int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
2468 2469 2470
  win->skey = TSKEY_INITIAL_VAL;

  int64_t st = taosGetTimestampUs();
D
fix bug  
dapan1121 已提交
2471 2472
  int16_t rv = -1;
  STSchema* pSchema = NULL;
H
Haojun Liao 已提交
2473 2474

  do {
H
Haojun Liao 已提交
2475
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
2476 2477 2478 2479
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2480
    TSKEY key = TD_ROW_KEY(row);
2481 2482 2483
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey,
                pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2484 2485 2486 2487 2488 2489 2490 2491 2492

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2493
    if (rv != TD_ROW_SVER(row)) {
2494
      pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
H
Haojun Liao 已提交
2495
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2496
    }
2497
    mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true);
H
Haojun Liao 已提交
2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508

    if (++numOfRows >= maxRowsToRead) {
      moveToNextRowInMem(pCheckInfo);
      break;
    }

  } while(moveToNextRowInMem(pCheckInfo));

  assert(numOfRows <= maxRowsToRead);

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
2509
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
H
Haojun Liao 已提交
2510 2511 2512
    int32_t emptySize = maxRowsToRead - numOfRows;

    for(int32_t i = 0; i < numOfCols; ++i) {
2513
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
2514 2515 2516 2517 2518
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }

  int64_t elapsedTime = taosGetTimestampUs() - st;
H
Haojun Liao 已提交
2519 2520
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle,
            elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2521 2522 2523 2524

  return numOfRows;
}

2525 2526
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2527

2528 2529 2530 2531 2532
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2533

2534
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2535 2536 2537
    taosArrayPush(list, &info);
  }

2538
  metaCloseCtbCurosr(pCur);
H
Haojun Liao 已提交
2539 2540 2541 2542 2543 2544 2545 2546
  return TSDB_CODE_SUCCESS;
}

static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

2547 2548
//  tQueryInfo* pInfo = (tQueryInfo*)param;
//  if (pInfo->optr != TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
2549
//    taosMemoryFreeClear(pInfo->q);
2550 2551 2552
//  } else {
//    taosHashCleanup((SHashObj *)(pInfo->q));
//  }
H
Haojun Liao 已提交
2553

wafwerar's avatar
wafwerar 已提交
2554
  taosMemoryFree(param);
H
Haojun Liao 已提交
2555 2556
}

2557 2558 2559 2560 2561
#define TSDB_PREV_ROW  0x1
#define TSDB_NEXT_ROW  0x2

static bool  loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2562 2563
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2564

2565
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2566
    if (code != TSDB_CODE_SUCCESS) {
2567
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2568 2569
      return false;
    }
H
Haojun Liao 已提交
2570

H
Haojun Liao 已提交
2571
    if (exists) {
H
Haojun Liao 已提交
2572
      tsdbRetrieveDataBlock((tsdbReaderT*) pTsdbReadHandle, NULL);
2573 2574 2575
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2576 2577
      }

2578
      pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2579 2580
      return exists;
    }
H
Haojun Liao 已提交
2581

2582
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2583
  }
H
Haojun Liao 已提交
2584

2585 2586
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2587 2588
    return true;
  }
H
Haojun Liao 已提交
2589

H
Haojun Liao 已提交
2590
  // current result is empty
2591 2592
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) {
//    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2593

2594 2595
//    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
//    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2596

2597
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2598

2599 2600 2601
//    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2602 2603

    return result;
2604
  }
H
Haojun Liao 已提交
2605

H
Haojun Liao 已提交
2606 2607
  return false;
}
2608

2609
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2610
  // the last row is cached in buffer, return it directly.
2611 2612 2613
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
  int32_t numOfCols  = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2614 2615
  assert(numOfTables > 0 && numOfCols > 0);

2616
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2617

H
Haojun Liao 已提交
2618
  STSRow*  pRow = NULL;
H
Haojun Liao 已提交
2619
  TSKEY    key  = TSKEY_INITIAL_VAL;
2620 2621 2622 2623 2624 2625 2626 2627
  int32_t  step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
//    if (ret != TSDB_CODE_SUCCESS) {
//      return false;
//    }
H
Haojun Liao 已提交
2628
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
wafwerar's avatar
wafwerar 已提交
2629
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2630

H
Haojun Liao 已提交
2631 2632 2633 2634 2635 2636 2637 2638 2639 2640
    // update the last key value
    pCheckInfo->lastKey = key + step;

    cur->rows     = 1;  // only one row
    cur->lastKey  = key + step;
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2641
  }
H
Haojun Liao 已提交
2642

H
Haojun Liao 已提交
2643 2644 2645
  return false;
}

D
init  
dapan1121 已提交
2646

D
update  
dapan1121 已提交
2647

2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799
//static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId);
//      continue;
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2800 2801 2802
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
2803

2804 2805
  while(pTsdbReadHandle->activeIndex < numOfTables) {
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2806 2807 2808
      return true;
    }

2809
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
2810 2811
    pCheckInfo->numOfBlocks = 0;

2812 2813 2814 2815 2816
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
    pTsdbReadHandle->checkFiles  = true;
    pTsdbReadHandle->cur.rows    = 0;
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
2817 2818 2819 2820

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
2821
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2822 2823 2824
  }

  return false;
2825 2826
}

H
Haojun Liao 已提交
2827
// handle data in cache situation
H
Haojun Liao 已提交
2828
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
2829
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
Y
yihaoDeng 已提交
2830

2831
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2832
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
2833 2834 2835
    return false;
  }

Y
yihaoDeng 已提交
2836 2837 2838
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

2839
  // TODO refactor: remove "type"
2840 2841 2842 2843 2844
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
//      return loadCachedLastRow(pTsdbReadHandle);
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
//      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
2845
    }
H
Haojun Liao 已提交
2846
  }
Y
yihaoDeng 已提交
2847

2848 2849
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
H
Haojun Liao 已提交
2850
  } else { // loadType == RR and Offset Order
2851
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2852 2853 2854
      // check if the query range overlaps with the file data block
      bool exists = true;

2855
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2856
      if (code != TSDB_CODE_SUCCESS) {
2857 2858
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2859 2860 2861 2862 2863

        return false;
      }

      if (exists) {
2864
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
2865 2866
        return exists;
      }
Y
yihaoDeng 已提交
2867

2868 2869
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
2870 2871
    }

H
Haojun Liao 已提交
2872
    // TODO: opt by consider the scan order
2873
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
2874
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
2875

H
Haojun Liao 已提交
2876
    elapsedTime = taosGetTimestampUs() - stime;
2877
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2878
    return ret;
Y
yihaoDeng 已提交
2879 2880
  }
}
2881

2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
2917
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
//  STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
2936
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
2937 2938 2939 2940 2941 2942 2943 2944 2945 2946
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
2947
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
2948
//  taosMemoryFreeClear(cond.colList);
2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
//out_of_memory:
2988
//  tsdbCleanupReadHandle(pSecQueryHandle);
2989 2990 2991
//  return terrno;
//}

H
Haojun Liao 已提交
2992
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
2993 2994
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2995

H
Haojun Liao 已提交
2996 2997
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
2998
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
2999 3000
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3001 3002
  }

3003
  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3004
  for (int32_t i = 0; i < numOfCols; ++i) {
3005 3006
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3007 3008 3009

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3010
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3011
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3012 3013

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3014
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3015
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3016 3017 3018
    }
  }

H
Haojun Liao 已提交
3019 3020
  cur->rows = 2;
  return true;
3021 3022
}

3023
/*
3024
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3025
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3026
 */
H
Haojun Liao 已提交
3027
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3044
// out:
3045 3046 3047 3048
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3049 3050
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
  return ((STsdbReadHandle *)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3051 3052
}

3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {
  assert(pTsdbReadHandle != NULL && groupList != NULL);

//  TSKEY    key = TSKEY_INITIAL_VAL;
//
//  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
//  assert(group != NULL);
//
//  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
//
//  int32_t code = 0;
//
//  if (((STable*)pInfo->pTable)->lastRow) {
//    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
//    if (code != TSDB_CODE_SUCCESS) {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
//    } else {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
//    }
//  }
//
//  // update the tsdb query time range
//  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
//    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
//    pTsdbReadHandle->checkFiles  = false;
//    pTsdbReadHandle->activeIndex = -1;  // start from -1
//  }
H
Haojun Liao 已提交
3080

3081
  return TSDB_CODE_SUCCESS;
3082 3083
}

3084 3085
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3086 3087

  int32_t code = 0;
3088 3089 3090
//  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
//    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
//  }
D
update  
dapan1121 已提交
3091 3092

  // update the tsdb query time range
3093 3094 3095
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->checkFiles  = false;
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3096 3097 3098 3099 3100 3101
  }

  return code;
}


3102
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
H
Haojun Liao 已提交
3103
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3104

H
Haojun Liao 已提交
3105
  int32_t totalNumOfTable = 0;
3106
  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
H
Haojun Liao 已提交
3107

H
Haojun Liao 已提交
3108 3109 3110 3111 3112
  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  for(int32_t j = 0; j < numOfGroups; ++j) {
    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
    TSKEY   key = TSKEY_INITIAL_VAL;
H
Haojun Liao 已提交
3113

H
Haojun Liao 已提交
3114
    STableKeyInfo keyInfo = {0};
H
Haojun Liao 已提交
3115

H
Haojun Liao 已提交
3116 3117
    size_t numOfTables = taosArrayGetSize(pGroup);
    for(int32_t i = 0; i < numOfTables; ++i) {
3118
      STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
H
Haojun Liao 已提交
3119

H
Haojun Liao 已提交
3120
      // if the lastKey equals to INT64_MIN, there is no data in this table
3121
      TSKEY lastKey = 0;//((STable*)(pInfo->pTable))->lastKey;
H
Haojun Liao 已提交
3122 3123
      if (key < lastKey) {
        key = lastKey;
H
Haojun Liao 已提交
3124

3125
//        keyInfo.pTable  = pInfo->pTable;
H
Haojun Liao 已提交
3126
        keyInfo.lastKey = key;
3127
        pInfo->lastKey  = key;
H
Haojun Liao 已提交
3128

H
Haojun Liao 已提交
3129 3130 3131
        if (key < window.skey) {
          window.skey = key;
        }
3132

H
Haojun Liao 已提交
3133 3134 3135 3136
        if (key > window.ekey) {
          window.ekey = key;
        }
      }
3137
    }
H
Haojun Liao 已提交
3138

H
Haojun Liao 已提交
3139
    // more than one table in each group, only one table left for each group
3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
//    if (keyInfo.pTable != NULL) {
//      totalNumOfTable++;
//      if (taosArrayGetSize(pGroup) == 1) {
//        // do nothing
//      } else {
//        taosArrayClear(pGroup);
//        taosArrayPush(pGroup, &keyInfo);
//      }
//    } else {  // mark all the empty groups, and remove it later
//      taosArrayDestroy(pGroup);
//      taosArrayPush(emptyGroup, &j);
//    }
3152
  }
H
Haojun Liao 已提交
3153

H
Haojun Liao 已提交
3154 3155 3156
  // window does not being updated, so set the original
  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
    window = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3157
    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
H
Haojun Liao 已提交
3158 3159
  }

H
Haojun Liao 已提交
3160
  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t) taosArrayGetSize(emptyGroup));
3161 3162
  taosArrayDestroy(emptyGroup);

H
Haojun Liao 已提交
3163
  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3164
  return window;
H
hjxilinx 已提交
3165 3166
}

H
Haojun Liao 已提交
3167
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3168
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3169
  SQueryFilePos* cur = &pHandle->cur;
3170 3171

  uint64_t uid = 0;
H
Haojun Liao 已提交
3172

3173
  // there are data in file
D
dapan1121 已提交
3174
  if (pHandle->cur.fid != INT32_MIN) {
3175
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3176
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3177
  } else {
3178
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3179
    uid = pCheckInfo->tableId;
3180
  }
3181

3182 3183
  pDataBlockInfo->uid    = uid;
  pDataBlockInfo->rows   = cur->rows;
H
Haojun Liao 已提交
3184
  pDataBlockInfo->window = cur->win;
S
TD-1057  
Shengliang Guan 已提交
3185
  pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
3186
}
H
hjxilinx 已提交
3187

H
Haojun Liao 已提交
3188 3189 3190
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
H
Haojun Liao 已提交
3191
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
3192
  STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle;
H
Haojun Liao 已提交
3193

H
Haojun Liao 已提交
3194 3195
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3196 3197 3198
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3199

H
Haojun Liao 已提交
3200 3201 3202 3203
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3204 3205 3206 3207
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3208 3209

  int64_t stime = taosGetTimestampUs();
3210 3211
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3212
    return terrno;
3213 3214 3215
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3216
  }
H
Haojun Liao 已提交
3217

H
Haojun Liao 已提交
3218 3219
  int16_t* colIds = pHandle->defaultLoadColumn->pData;

H
Haojun Liao 已提交
3220
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
H
Haojun Liao 已提交
3221
  memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
3222
  for(int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3223
    pHandle->statis[i].colId = colIds[i];
3224
  }
H
Haojun Liao 已提交
3225

3226
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3227 3228 3229

  // always load the first primary timestamp column data
  SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
3230
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3231 3232 3233 3234 3235

  pPrimaryColStatis->numOfNull = 0;
  pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
  pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;

H
Haojun Liao 已提交
3236
  //update the number of NULL data rows
H
Haojun Liao 已提交
3237
  for(int32_t i = 1; i < numOfCols; ++i) {
3238
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
3239 3240 3241
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
  }
H
Haojun Liao 已提交
3242 3243 3244 3245

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

H
Haojun Liao 已提交
3246
  *pBlockStatis = pHandle->statis;
3247
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3248 3249
}

H
Haojun Liao 已提交
3250
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3251
  /**
H
hjxilinx 已提交
3252
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3253 3254
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3255
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3256

D
dapan1121 已提交
3257
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3258 3259
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3260 3261
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3262

3263
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3264 3265
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3266
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3267
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3268

H
hjxilinx 已提交
3269 3270
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3271

H
Hongze Cheng 已提交
3272
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3273
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3274
        return pHandle->pColumns;
H
Haojun Liao 已提交
3275
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3276
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3277
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3278 3279
          return NULL;
        }
H
Haojun Liao 已提交
3280

H
Haojun Liao 已提交
3281
        // todo refactor
H
Haojun Liao 已提交
3282
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
3283

H
Haojun Liao 已提交
3284
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
3285
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
3286
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
S
TD-1057  
Shengliang Guan 已提交
3287
          int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
H
Haojun Liao 已提交
3288

H
Haojun Liao 已提交
3289 3290
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
3291
            memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
H
Haojun Liao 已提交
3292 3293
          }
        }
H
Haojun Liao 已提交
3294

H
hjxilinx 已提交
3295 3296
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3297 3298
    }
  }
H
hjxilinx 已提交
3299
}
3300
#if 0
3301
void filterPrepare(void* expr, void* param) {
3302
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
3303
  if (pExpr->_node.info != NULL) {
3304 3305
    return;
  }
3306

wafwerar's avatar
wafwerar 已提交
3307
  pExpr->_node.info = taosMemoryCalloc(1, sizeof(tQueryInfo));
H
Haojun Liao 已提交
3308

3309
  STSchema*   pTSSchema = (STSchema*) param;
H
hjxilinx 已提交
3310 3311 3312
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
3313

3314 3315
  pInfo->sch      = *pSchema;
  pInfo->optr     = pExpr->_node.optr;
Y
yihaoDeng 已提交
3316
  pInfo->compare  = getComparFunc(pInfo->sch.type, pInfo->optr);
H
Haojun Liao 已提交
3317
  pInfo->indexed  = pTSSchema->columns->colId == pInfo->sch.colId;
H
Haojun Liao 已提交
3318

H
hjxilinx 已提交
3319
  if (pInfo->optr == TSDB_RELATION_IN) {
Y
yihaoDeng 已提交
3320
     int dummy = -1;
3321
     SHashObj *pObj = NULL;
Y
yihaoDeng 已提交
3322 3323 3324 3325
     if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
        pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
        SArray *arr = (SArray *)(pCond->arr);
        for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
Y
yihaoDeng 已提交
3326
          char* p = taosArrayGetP(arr, i);
3327 3328
          strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
          taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
Y
yihaoDeng 已提交
3329 3330 3331 3332
        }
     } else {
       buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
     }
3333
     pInfo->q = (char *)pObj;
H
Haojun Liao 已提交
3334
  } else if (pCond != NULL) {
3335 3336 3337 3338
    uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
    if (size < (uint32_t)pSchema->bytes) {
      size = pSchema->bytes;
    }
wafwerar's avatar
wafwerar 已提交
3339
    // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
wafwerar's avatar
wafwerar 已提交
3340
    pInfo->q = taosMemoryCalloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
3341
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
3342
  }
3343 3344
}

3345
#endif
3346

H
Haojun Liao 已提交
3347
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
3348
#if 0
3349
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
3350 3351
  STable* pTable1 = ((STableKeyInfo*) p1)->uid;
  STable* pTable2 = ((STableKeyInfo*) p2)->uid;
H
Haojun Liao 已提交
3352

3353 3354 3355
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
H
Haojun Liao 已提交
3356

H
Haojun Liao 已提交
3357
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
H
Haojun Liao 已提交
3358

3359 3360 3361 3362
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
H
Haojun Liao 已提交
3363

H
Haojun Liao 已提交
3364 3365 3366
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
3367
      type = TSDB_DATA_TYPE_BINARY;
3368
      bytes = tGetTbnameColumnSchema()->bytes;
3369
    } else {
Y
yihaoDeng 已提交
3370 3371 3372 3373 3374 3375 3376
      if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
        STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
        bytes = pCol->bytes;
        type = pCol->type;
        f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
        f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
      } 
3377
    }
H
Haojun Liao 已提交
3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

3392 3393 3394 3395 3396 3397 3398
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
3399
#endif
3400 3401 3402
  return 0;
}

H
Haojun Liao 已提交
3403
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3404
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3405
    return -1;
3406
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3407 3408 3409 3410 3411 3412 3413 3414 3415
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
                          STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
3416
  STable* pTable = taosArrayGetP(pTableList, 0);
H
Haojun Liao 已提交
3417 3418
  SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));

3419
  STableKeyInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
3420
  taosArrayPush(g, &info);
3421

3422
  for (int32_t i = 1; i < numOfTables; ++i) {
3423 3424
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
Haojun Liao 已提交
3425

H
hjxilinx 已提交
3426
    int32_t ret = compareFn(prev, p, pSupp);
3427
    assert(ret == 0 || ret == -1);
H
Haojun Liao 已提交
3428

3429
    if (ret == 0) {
3430
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3431
      taosArrayPush(g, &info1);
3432 3433
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
H
Haojun Liao 已提交
3434 3435
      g = taosArrayInit(16, sizeof(STableKeyInfo));

3436
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3437
      taosArrayPush(g, &info1);
3438 3439
    }
  }
H
Haojun Liao 已提交
3440

3441
  taosArrayPush(pGroups, &g);
3442 3443
}

3444
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
3445
  assert(pTableList != NULL);
3446
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3447

3448 3449
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
3450
    tsdbDebug("no qualified tables");
3451 3452
    return pTableGroup;
  }
H
Haojun Liao 已提交
3453

3454
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
3455
    SArray* sa = taosArrayDup(pTableList);
H
Haojun Liao 已提交
3456 3457 3458 3459
    if (sa == NULL) {
      taosArrayDestroy(pTableGroup);
      return NULL;
    }
H
Haojun Liao 已提交
3460

3461
    taosArrayPush(pTableGroup, &sa);
S
TD-1057  
Shengliang Guan 已提交
3462
    tsdbDebug("all %" PRIzu " tables belong to one group", size);
3463
  } else {
H
Haojun Liao 已提交
3464 3465
    STableGroupSupporter sup = {0};
    sup.numOfCols = numOfOrderCols;
3466
    sup.pTagSchema = pTagSchema->pSchema;
H
Haojun Liao 已提交
3467 3468
    sup.pCols = pCols;

3469 3470
    taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
    createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
3471
  }
H
Haojun Liao 已提交
3472

3473 3474 3475
  return pTableGroup;
}

3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559
//static bool tableFilterFp(const void* pNode, void* param) {
//  tQueryInfo* pInfo = (tQueryInfo*) param;
//
//  STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
//  char* val = NULL;
//  if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
//    val = (char*) TABLE_NAME(pTable);
//  } else {
//    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
//  }
//
//  if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
//    if (pInfo->optr == TSDB_RELATION_ISNULL) {
//      return (val == NULL) || isNull(val, pInfo->sch.type);
//    } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
//      return (val != NULL) && (!isNull(val, pInfo->sch.type));
//    }
//  } else if (pInfo->optr == TSDB_RELATION_IN) {
//     int type = pInfo->sch.type;
//     if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
//       int64_t v;
//       GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
//       uint64_t v;
//       GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     }
//     else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
//       double v;
//       GET_TYPED_DATA(v, double, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
//       return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
//     }
//
//  }
//
//  int32_t ret = 0;
//  if (val == NULL) { //the val is possible to be null, so check it out carefully
//    ret = -1; // val is missing in table tags value pairs
//  } else {
//    ret = pInfo->compare(val, pInfo->q);
//  }
//
//  switch (pInfo->optr) {
//    case TSDB_RELATION_EQUAL: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NOT_EQUAL: {
//      return ret != 0;
//    }
//    case TSDB_RELATION_GREATER_EQUAL: {
//      return ret >= 0;
//    }
//    case TSDB_RELATION_GREATER: {
//      return ret > 0;
//    }
//    case TSDB_RELATION_LESS_EQUAL: {
//      return ret <= 0;
//    }
//    case TSDB_RELATION_LESS: {
//      return ret < 0;
//    }
//    case TSDB_RELATION_LIKE: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_MATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NMATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_IN: {
//      return ret == 1;
//    }
//
//    default:
//      assert(false);
//  }
//
//  return true;
//}
H
Haojun Liao 已提交
3560

3561
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
3562

3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574
//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
//  // query according to the expression tree
//  SExprTraverseSupp supp = {
//      .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
//      .setupInfoFn = filterPrepare,
//      .pExtInfo = pSTable->tagSchema,
//      };
//
//  getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
//  tExprTreeDestroy(pExpr, destroyHelper);
//  return TSDB_CODE_SUCCESS;
//}
3575

H
Haojun Liao 已提交
3576
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
3577
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
3578
                                 SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
H
Haojun Liao 已提交
3579
  STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
3580
  if (pTbCfg == NULL) {
H
Haojun Liao 已提交
3581
    tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
3582 3583
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3584
  }
H
Haojun Liao 已提交
3585

3586
  if (pTbCfg->type != META_SUPER_TABLE) {
H
Haojun Liao 已提交
3587
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId);
3588
    terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
3589
    goto _error;
H
hjxilinx 已提交
3590
  }
3591 3592

  //NOTE: not add ref count for super table
H
Haojun Liao 已提交
3593
  SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
3594
  SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
H
Haojun Liao 已提交
3595

weixin_48148422's avatar
weixin_48148422 已提交
3596 3597
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
H
Haojun Liao 已提交
3598
    int32_t ret = getAllTableList(pMeta, uid, res);
3599 3600
    if (ret != TSDB_CODE_SUCCESS) {
      goto _error;
3601
    }
3602

sangshuduo's avatar
sangshuduo 已提交
3603
    pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
H
Haojun Liao 已提交
3604
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
H
Haojun Liao 已提交
3605

H
Haojun Liao 已提交
3606 3607
    tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta,
              pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
3608

3609
    taosArrayDestroy(res);
3610 3611
    return ret;
  }
3612

H
hjxilinx 已提交
3613
  int32_t ret = TSDB_CODE_SUCCESS;
3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625
//  tExprNode* expr = NULL;
//
//  TRY(TSDB_MAX_TAG_CONDITIONS) {
//    expr = exprTreeFromTableName(tbnameCond);
//    if (expr == NULL) {
//      expr = exprTreeFromBinary(pTagCond, len);
//    } else {
//      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
//      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
//      if (tagExpr != NULL) {
//        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
//        tExprNode* tbnameExpr = expr;
wafwerar's avatar
wafwerar 已提交
3626
//        expr = taosMemoryCalloc(1, sizeof(tExprNode));
3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657
//        if (expr == NULL) {
//          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
//        }
//        expr->nodeType = TSQL_NODE_EXPR;
//        expr->_node.optr = (uint8_t)tagNameRelType;
//        expr->_node.pLeft = tagExpr;
//        expr->_node.pRight = tbnameExpr;
//      }
//    }
//    CLEANUP_EXECUTE();
//
//  } CATCH( code ) {
//    CLEANUP_EXECUTE();
//    terrno = code;
//    tsdbUnlockRepoMeta(tsdb);     // unlock tsdb in any cases
//
//    goto _error;
//    // TODO: more error handling
//  } END_TRY
//
//  doQueryTableList(pTable, res, expr);
//  pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
//  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
//  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
//      pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
//  taosArrayDestroy(res);
//
//  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
//  return ret;
3658 3659 3660

  _error:
  return terrno;
3661
}
3662

H
Haojun Liao 已提交
3663 3664
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
  STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
3665
  if (pTbCfg == NULL) {
3666 3667
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3668
  }
3669

3670 3671
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3672

H
Haojun Liao 已提交
3673 3674
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));

3675
  STableKeyInfo info = {.lastKey = startKey, .uid = uid};
H
Haojun Liao 已提交
3676
  taosArrayPush(group, &info);
H
Haojun Liao 已提交
3677

3678
  taosArrayPush(pGroupInfo->pGroupList, &group);
3679
  return TSDB_CODE_SUCCESS;
3680 3681 3682

  _error:
  return terrno;
3683
}
3684

3685
#if 0
3686
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
3687 3688 3689
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
3690 3691 3692 3693

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3694
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
3695

B
Bomin Zhang 已提交
3696
  for(int32_t i = 0; i < size; ++i) {
3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
D
fix bug  
dapan1121 已提交
3708 3709 3710
      tsdbUnlockRepoMeta(tsdb);
      taosArrayDestroy(group);
      return terrno;
3711 3712
    }

H
Haojun Liao 已提交
3713 3714
    STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
    taosArrayPush(group, &info);
3715 3716
  }

B
Bomin Zhang 已提交
3717 3718 3719 3720
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
3721

sangshuduo's avatar
sangshuduo 已提交
3722
  pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
B
Bomin Zhang 已提交
3723 3724 3725 3726 3727
  if (pGroupInfo->numOfTables > 0) {
    taosArrayPush(pGroupInfo->pGroupList, &group);
  } else {
    taosArrayDestroy(group);
  }
3728 3729 3730

  return TSDB_CODE_SUCCESS;
}
3731
#endif
3732 3733 3734 3735 3736 3737 3738 3739
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3740
    taosMemoryFreeClear(pColInfo->pData);
3741 3742 3743 3744 3745 3746
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3747 3748 3749 3750 3751 3752
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3753
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3754 3755 3756 3757 3758 3759
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

3760

H
Haojun Liao 已提交
3761
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3762 3763
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3764 3765
    return;
  }
3766

3767
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3768

3769
  taosArrayDestroy(pTsdbReadHandle->defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3770 3771
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
  taosMemoryFreeClear(pTsdbReadHandle->statis);
3772

3773
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
3774
//    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3775
  } else {
3776
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3777 3778
  }

3779 3780
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3781
  }
3782

3783
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3784

3785 3786
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3787

3788 3789
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3790

3791
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3792

H
Haojun Liao 已提交
3793 3794
  tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s",
      pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3795

wafwerar's avatar
wafwerar 已提交
3796
  taosMemoryFreeClear(pTsdbReadHandle);
3797
}
3798

3799
#if 0
H
Haojun Liao 已提交
3800
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
3801 3802 3803 3804 3805 3806 3807 3808 3809 3810
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
3811 3812 3813 3814
      if (pTable != NULL) { // in case of handling retrieve data from tsdb
        tsdbUnRefTable(pTable);
      }
      //assert(pTable != NULL);
3815 3816 3817 3818 3819
    }

    taosArrayDestroy(p);
  }

3820
  taosHashCleanup(pGroupList->map);
3821
  taosArrayDestroy(pGroupList->pGroupList);
H
Haojun Liao 已提交
3822
  pGroupList->numOfTables = 0;
3823
}
H
Haojun Liao 已提交
3824 3825 3826 3827 3828 3829 3830

static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  // Scan each node in the skiplist by using iterator
  while (tSkipListIterNext(iter)) {
    SSkipListNode *pNode = tSkipListIterGet(iter);
H
Haojun Liao 已提交
3831
    if (exprTreeApplyFilter(pExpr, pNode, param)) {
H
Haojun Liao 已提交
3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854
      taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
    }
  }

  tSkipListDestroyIter(iter);
}

typedef struct {
  char*    v;
  int32_t  optr;
} SEndPoint;

typedef struct {
  SEndPoint* start;
  SEndPoint* end;
} SQueryCond;

// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
  int32_t optr = queryColInfo->optr;

  if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
      optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3855
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3856
    pCond->start->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3857
    pCond->start->v    = queryColInfo->q;
H
Haojun Liao 已提交
3858
  } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3859
    pCond->end       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3860
    pCond->end->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3861 3862
    pCond->end->v    = queryColInfo->q;
  } else if (optr == TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
3863
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
Y
yihaoDeng 已提交
3864 3865 3866
    pCond->start->optr = queryColInfo->optr;
    pCond->start->v    = queryColInfo->q; 
  } else if (optr == TSDB_RELATION_LIKE) {
H
Haojun Liao 已提交
3867
    assert(0);
3868 3869
  } else if (optr == TSDB_RELATION_MATCH) {
    assert(0);
3870 3871
  } else if (optr == TSDB_RELATION_NMATCH) {
    assert(0);
H
Haojun Liao 已提交
3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954
  }

  return TSDB_CODE_SUCCESS;
}

static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
  SSkipListIterator* iter = NULL;

  SQueryCond cond = {0};
  if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
    //todo handle error
  }

  if (cond.start != NULL) {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
  } else {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
  }

  if (cond.start != NULL) {
    int32_t optr = cond.start->optr;

    if (optr == TSDB_RELATION_EQUAL) {   // equals
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
    } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
      bool comp = true;
      int32_t ret = 0;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
          assert(ret >= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_GREATER) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;
        }
      }
    } else if (optr == TSDB_RELATION_NOT_EQUAL) {   // not equal
      bool comp = true;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

      tSkipListDestroyIter(iter);

      comp = true;
      iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

Y
yihaoDeng 已提交
3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967
    } else if (optr == TSDB_RELATION_IN) {
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
      
H
Haojun Liao 已提交
3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008
    } else {
      assert(0);
    }
  } else {
    int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
    if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
      bool    comp = true;
      int32_t ret = 0;

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
          assert(ret <= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_LESS) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;  // no need to compare anymore
        }
      }
    } else {
      assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
        if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
            (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
        }
      }
    }
  }

wafwerar's avatar
wafwerar 已提交
4009 4010
  taosMemoryFree(cond.start);
  taosMemoryFree(cond.end);
H
Haojun Liao 已提交
4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028
  tSkipListDestroyIter(iter);
}

static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  while (tSkipListIterNext(iter)) {
    bool addToResult = false;

    SSkipListNode *pNode = tSkipListIterGet(iter);

    char *pData = SL_GET_NODE_DATA(pNode);
    tstr *name = (tstr*) tsdbGetTableName((void*) pData);

    // todo speed up by using hash
    if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
      if (pQueryInfo->optr == TSDB_RELATION_IN) {
        addToResult = pQueryInfo->compare(name, pQueryInfo->q);
4029 4030 4031
      } else if (pQueryInfo->optr == TSDB_RELATION_LIKE ||
                 pQueryInfo->optr == TSDB_RELATION_MATCH ||
                 pQueryInfo->optr == TSDB_RELATION_NMATCH) {
H
Haojun Liao 已提交
4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062
        addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
      }
    } else {
      addToResult = filterFp(pNode, pQueryInfo);
    }

    if (addToResult) {
      STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
      taosArrayPush(res, &info);
    }
  }

  tSkipListDestroyIter(iter);
}

// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
  if (pExpr == NULL) {
    return;
  }

  tExprNode *pLeft  = pExpr->_node.pLeft;
  tExprNode *pRight = pExpr->_node.pRight;

  // column project
  if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
    assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));

    param->setupInfoFn(pExpr, param->pExtInfo);

    tQueryInfo *pQueryInfo = pExpr->_node.info;
4063 4064
    if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
                                && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
4065
                                && pQueryInfo->optr != TSDB_RELATION_IN)) {
H
Haojun Liao 已提交
4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080
      queryIndexedColumn(pSkipList, pQueryInfo, result);
    } else {
      queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
    }

    return;
  }

  // The value of hasPK is always 0.
  uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
  assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);

  //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
  applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
L
Liu Jicong 已提交
4081
#endif