tsdbRead.c 80.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
slguan 已提交
17
#include "tulog.h"
18
#include "talgo.h"
19
#include "tutil.h"
H
Haojun Liao 已提交
20
#include "ttime.h"
21
#include "tcompare.h"
22
#include "exception.h"
23

24
#include "../../../query/inc/qast.h"  // todo move to common module
25
#include "tlosertree.h"
26
#include "tsdb.h"
H
TD-34  
hzcheng 已提交
27
#include "tsdbMain.h"
28

29
#define EXTRA_BYTES 2
30
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
31
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
32

33 34 35 36 37
enum {
  QUERY_RANGE_LESS_EQUAL = 0,
  QUERY_RANGE_GREATER_EQUAL = 1,
};

H
hjxilinx 已提交
38
enum {
39 40 41
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
  TSDB_QUERY_TYPE_EXTERNAL = 3,
H
hjxilinx 已提交
42 43
};

44 45
typedef struct SQueryFilePos {
  int32_t fid;
46 47
  int32_t slot;
  int32_t pos;
48
  int64_t lastKey;
49 50
  int32_t rows;
  bool    mixBlock;
51
  bool    blockCompleted;
52
  STimeWindow win;
53
} SQueryFilePos;
H
hjxilinx 已提交
54

55
typedef struct SDataBlockLoadInfo {
H
hjxilinx 已提交
56
  SFileGroup* fileGroup;
57
  int32_t     slot;
H
hjLiao 已提交
58
  int32_t     tid;
59
  SArray*     pLoadedCols;
60
} SDataBlockLoadInfo;
H
hjxilinx 已提交
61

62
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
63
  int32_t tid; /* table tid */
64 65
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
66

67
typedef struct STableCheckInfo {
H
Haojun Liao 已提交
68 69 70 71 72 73 74
  STableId      tableId;
  TSKEY         lastKey;
  STable*       pTableObj;
  SCompInfo*    pCompInfo;
  int32_t       compSize;
  int32_t       numOfBlocks;    // number of qualified data blocks not the original blocks
  SDataCols*    pDataCols;
H
Haojun Liao 已提交
75
  int32_t       chosen;         // indicate which iterator should move forward
H
Haojun Liao 已提交
76 77 78
  bool          initBuf;        // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
79
} STableCheckInfo;
80

81
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
82 83
  SCompBlock*        compBlock;
  STableCheckInfo*   pTableCheckInfo;
84
} STableBlockInfo;
85

86 87
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
88
  STableBlockInfo**   pDataBlockInfo;
89
  int32_t*            blockIndexArray;
90
  int32_t*            numOfBlocksPerTable;
91 92
} SBlockOrderSupporter;

93
typedef struct STsdbQueryHandle {
H
Haojun Liao 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  STsdbRepo*     pTsdb;
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
  SArray*        pTableCheckInfo;  //SArray<STableCheckInfo>
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
  void*          qinfo;            // query info handle, for debug purpose
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
109 110
  SFileGroup*    pFileGroup;
  SFileGroupIter fileIter;
H
hjxilinx 已提交
111
  SRWHelper      rhelper;
H
Haojun Liao 已提交
112
  STableBlockInfo* pDataBlockInfo;
H
Haojun Liao 已提交
113
  int32_t        allocSize;        // allocated data block size
114 115
  SMemTable*     mem;              // mem-table
  SMemTable*     imem;             // imem-table, acquired from snapshot
H
Haojun Liao 已提交
116
  SArray*        defaultLoadColumn;// default load column
H
Haojun Liao 已提交
117 118
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
119 120
} STsdbQueryHandle;

121
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
122
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
123 124 125
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
                                   SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
H
Haojun Liao 已提交
126
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
127
                                 STsdbQueryHandle* pQueryHandle);
H
hjxilinx 已提交
128

129
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
130
  pBlockLoadInfo->slot = -1;
H
hjLiao 已提交
131
  pBlockLoadInfo->tid = -1;
H
hjxilinx 已提交
132
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
133 134
}

135
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
136
  pCompBlockLoadInfo->tid = -1;
137 138
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
139

H
Haojun Liao 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pQueryHandle);

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
  if (loadTS && colId != 0) {
    int16_t columnId = 0;
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
TD-353  
Hongze Cheng 已提交
168
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
169
  STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
H
Haojun Liao 已提交
170 171 172 173 174 175
  pQueryHandle->order       = pCond->order;
  pQueryHandle->window      = pCond->twindow;
  pQueryHandle->pTsdb       = tsdb;
  pQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
  pQueryHandle->cur.fid     = -1;
  pQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
176
  pQueryHandle->checkFiles  = true;
H
Haojun Liao 已提交
177
  pQueryHandle->activeIndex = 0;   // current active table index
H
Haojun Liao 已提交
178
  pQueryHandle->qinfo       = qinfo;
H
Haojun Liao 已提交
179
  pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
H
Haojun Liao 已提交
180 181
  pQueryHandle->allocSize   = 0;

H
TD-100  
hzcheng 已提交
182
  tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
183
  tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
184

185
  size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
186
  assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
H
hjxilinx 已提交
187

H
Haojun Liao 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
  // allocate buffer in order to load data blocks from file
  int32_t numOfCols = pCond->numOfCols;
  
  pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
  pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));  // todo: use list instead of array?
  
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData  colInfo = {{0}, 0};
  
    colInfo.info = pCond->colList[i];
    colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
    taosArrayPush(pQueryHandle->pColumns, &colInfo);
    pQueryHandle->statis[i].colId = colInfo.info.colId;
  }
  
203
  pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
204 205
  STsdbMeta* pMeta = tsdbGetMeta(tsdb);
  assert(pMeta != NULL);
206 207 208
  
  for (int32_t i = 0; i < sizeOfGroup; ++i) {
    SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
H
Haojun Liao 已提交
209
    
210
    size_t gsize = taosArrayGetSize(group);
211 212
    assert(gsize > 0);
    
213
    for (int32_t j = 0; j < gsize; ++j) {
214
      STable* pTable = (STable*) taosArrayGetP(group, j);
215 216 217
      
      STableCheckInfo info = {
          .lastKey = pQueryHandle->window.skey,
218 219
          .tableId = pTable->tableId,
          .pTableObj = pTable,
220
      };
H
Haojun Liao 已提交
221
      
222
      assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
B
Bomin Zhang 已提交
223
      info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
224

225 226
      taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
    }
H
hjxilinx 已提交
227
  }
H
Haojun Liao 已提交
228 229 230

  pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);

S
Shengliang Guan 已提交
231
  tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
232

233 234
  tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
235

H
hjxilinx 已提交
236 237 238
  return (TsdbQueryHandleT) pQueryHandle;
}

H
TD-353  
Hongze Cheng 已提交
239
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
240
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
H
hjxilinx 已提交
241
  
242
  pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
H
hjxilinx 已提交
243 244
  pQueryHandle->order = TSDB_ORDER_DESC;
  
245
  changeQueryHandleForLastrowQuery(pQueryHandle);
H
hjxilinx 已提交
246
  return pQueryHandle;
H
hjxilinx 已提交
247 248
}

249
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
250 251 252 253 254
  assert(pHandle != NULL);
  
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
  
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
255
  SArray* res = taosArrayInit(size, POINTER_BYTES);
256 257 258
  
  for(int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
259
    taosArrayPush(res, &pCheckInfo->pTableObj);
260 261 262 263 264
  }
  
  return res;
}

H
TD-353  
Hongze Cheng 已提交
265
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
266
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
267 268
  
  pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
269
  changeQueryHandleForInterpQuery(pQueryHandle);
270 271 272
  return pQueryHandle;
}

273
static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
274 275 276
  STable* pTable = pCheckInfo->pTableObj;
  assert(pTable != NULL);
  
277
  if (pCheckInfo->initBuf) {
278 279 280
    return true;
  }
  
281
  pCheckInfo->initBuf = true;
282
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
283

284
  // no data in buffer, abort
285
  if (pHandle->mem == NULL && pHandle->imem == NULL) {
286 287 288 289 290
    return false;
  }
  
  assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
  
291 292
  if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
    pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
H
Haojun Liao 已提交
293
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
294 295
  }
  
296 297
  if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
    pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
H
Haojun Liao 已提交
298
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  }
  
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
  
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
  
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
318
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
319 320
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
321
    tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
322 323 324 325 326 327 328 329
  }
  
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
330
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
331 332
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
333
    tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
334 335 336 337 338
  }
  
  return true;
}

H
Haojun Liao 已提交
339 340 341 342 343 344 345 346
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
  SDataRow rmem = NULL, rimem = NULL;
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
      rmem = SL_GET_NODE_DATA(node);
    }
  }
347

H
Haojun Liao 已提交
348 349 350 351 352 353
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
      rimem = SL_GET_NODE_DATA(node);
    }
  }
354

H
Haojun Liao 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367 368
  if (rmem != NULL && rimem != NULL) {
    if (dataRowKey(rmem) < dataRowKey(rimem)) {
      pCheckInfo->chosen = 0;
      return rmem;
    } else if (dataRowKey(rmem) == dataRowKey(rimem)) {
      // data ts are duplicated, ignore the data in mem
      tSkipListIterNext(pCheckInfo->iter);
      pCheckInfo->chosen = 1;
      return rimem;
    } else {
      pCheckInfo->chosen = 1;
      return rimem;
    }
  }
369

H
Haojun Liao 已提交
370 371 372 373
  if (rmem != NULL) {
    pCheckInfo->chosen = 0;
    return rmem;
  }
374

H
Haojun Liao 已提交
375 376 377 378
  if (rimem != NULL) {
    pCheckInfo->chosen = 1;
    return rimem;
  }
379

H
Haojun Liao 已提交
380 381 382 383 384 385 386 387 388
  return NULL;
}

bool moveToNextRow(STableCheckInfo* pCheckInfo) {
  bool hasNext = false;
  if (pCheckInfo->chosen == 0) {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
389

H
Haojun Liao 已提交
390 391 392
    if (hasNext) {
      return hasNext;
    }
393

H
Haojun Liao 已提交
394 395 396 397 398 399 400 401
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
  } else {
    if (pCheckInfo->chosen == 1) {
      if (pCheckInfo->iiter != NULL) {
        hasNext = tSkipListIterNext(pCheckInfo->iiter);
      }
402

H
Haojun Liao 已提交
403 404 405
      if (hasNext) {
        return hasNext;
      }
406

H
Haojun Liao 已提交
407 408 409 410 411
      if (pCheckInfo->iter != NULL) {
        return tSkipListIterGet(pCheckInfo->iter) != NULL;
      }
    }
  }
412

H
Haojun Liao 已提交
413 414 415
  return hasNext;
}

416
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
417 418
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
419 420 421
  pHandle->cur.fid = -1;
  
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
422

423
  STable* pTable = pCheckInfo->pTableObj;
424
  assert(pTable != NULL);
H
Haojun Liao 已提交
425 426 427 428 429

  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

H
Haojun Liao 已提交
430 431
  SDataRow row = getSDataRowInTableMem(pCheckInfo);
  if (row == NULL) {
432 433
    return false;
  }
434

435
  pCheckInfo->lastKey = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
436
  tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
437
      pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
438
  
439
  // all data in mem are checked already.
440 441
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
442 443
    return false;
  }
444 445 446
  
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
447
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
448 449 450 451 452
  
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
453

454 455 456 457
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
    SWAP(win->skey, win->ekey, TSKEY);
  }
  
458
  return true;
459
}
H
hjxilinx 已提交
460

461 462
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
463 464 465 466
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
  
467
  int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision]));  // set the starting fileId
468 469 470 471 472
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
  
  if (fid > 0L && fid > INT32_MAX) {
473 474 475 476
    fid = INT32_MAX;
  }
  
  return fid;
477 478
}

H
Haojun Liao 已提交
479
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
  
  int32_t midSlot = firstSlot;
  
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
    
    if (numOfBlocks == 1) break;
    
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
  
  return midSlot;
}
505 506 507

static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
  SFileGroup* fileGroup = pQueryHandle->pFileGroup;
508
  assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
H
Haojun Liao 已提交
509 510 511 512 513 514 515

  int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);

  //open file failed, return error code to client
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
516 517 518 519 520 521 522 523

  // load all the comp offset value for all tables in this file
  *numOfBlocks = 0;
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);

  for (int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);

H
TD-100  
hzcheng 已提交
524
    SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
525 526
    if (compIndex->len == 0 || compIndex->numOfBlocks == 0 ||
        compIndex->uid != pCheckInfo->tableId.uid) {  // no data block in this file, try next file
527
      pCheckInfo->numOfBlocks = 0;
528
      continue;  // no data blocks in the file belongs to pCheckInfo->pTable
529
    } else {
H
hjxilinx 已提交
530 531 532 533 534 535 536
      if (pCheckInfo->compSize < compIndex->len) {
        assert(compIndex->len > 0);
        
        char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
        assert(t != NULL);
        
        pCheckInfo->pCompInfo = (SCompInfo*) t;
537
        pCheckInfo->compSize = compIndex->len;
H
hjxilinx 已提交
538 539
      }
      
H
hjxilinx 已提交
540
      tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
H
TD-100  
hzcheng 已提交
541 542

      tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
543 544 545 546 547
      SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
      
      TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      
548
      // discard the unqualified data block based on the query time window
H
Haojun Liao 已提交
549
      int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
550 551 552
      int32_t end = start;
      
      if (s > pCompInfo->blocks[start].keyLast) {
553 554 555
        continue;
      }

556
      // todo speedup the procedure of located end block
H
hzcheng 已提交
557
      while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
558
        end += 1;
559 560
      }

561 562
      pCheckInfo->numOfBlocks = (end - start);
      
563
      if (start > 0) {
564
        memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
565 566 567 568
      }

      (*numOfBlocks) += pCheckInfo->numOfBlocks;
    }
569
  }
570

571 572 573
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
574 575 576 577 578 579
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
                    .tid = (_checkInfo)->tableId.tid,                                  \
                    .uid = (_checkInfo)->tableId.uid})
580

581 582


583
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
H
hzcheng 已提交
584
  STsdbRepo *pRepo = pQueryHandle->pTsdb;
585 586

  // TODO refactor
587
  SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
H
[td-32]  
hjxilinx 已提交
588

H
hjxilinx 已提交
589 590
  data->numOfCols = pBlock->numOfCols;
  data->uid = pCheckInfo->pTableObj->tableId.uid;
591 592

  bool    blockLoaded = false;
H
Haojun Liao 已提交
593
  int64_t st = taosGetTimestampUs();
594

H
Haojun Liao 已提交
595
  if (pCheckInfo->pDataCols == NULL) {
H
TD-353  
Hongze Cheng 已提交
596
    STsdbMeta* pMeta = tsdbGetMeta(pRepo);
H
TD-353  
Hongze Cheng 已提交
597
    // TODO
H
Haojun Liao 已提交
598
    pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
599 600
  }

H
Hongze Cheng 已提交
601 602 603 604
  STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
  tdInitDataCols(pCheckInfo->pDataCols, pSchema);
  tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
  tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
605

606
  if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo) == 0) {
607 608 609 610
    SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;

    pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
    pBlockLoadInfo->slot = pQueryHandle->cur.slot;
H
hjLiao 已提交
611
    pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
612 613 614 615

    blockLoaded = true;
  }

H
Haojun Liao 已提交
616
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
617
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
618

619
  pBlock->numOfRows = pCols->numOfRows;
620
  tfree(data);
621

H
Haojun Liao 已提交
622
  int64_t et = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
623
  tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
624

625
  return blockLoaded;
H
hjxilinx 已提交
626 627
}

628 629
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
630
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
631

632
  /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
633 634 635
  SDataRow row = getSDataRowInTableMem(pCheckInfo);

  TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
636 637
  cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
  
638 639
  if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
640
    
641 642 643
    if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {

H
Haojun Liao 已提交
644 645 646
      // do not load file block into buffer
      int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;

H
Haojun Liao 已提交
647
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
H
Haojun Liao 已提交
648
                                        pQueryHandle->outputCapacity, &cur->win, pQueryHandle);
H
Haojun Liao 已提交
649 650 651 652 653 654 655 656 657 658 659 660
      pQueryHandle->realNumOfRows = cur->rows;

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
      if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
        SWAP(cur->win.skey, cur->win.ekey, TSKEY);
      }
      
      cur->mixBlock = true;
      cur->blockCompleted = false;
      return;
    }
H
Haojun Liao 已提交
661
  
662
    doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
H
Haojun Liao 已提交
663
    doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
664
  } else {
665 666 667 668 669 670
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
671
    assert(pQueryHandle->outputCapacity >= binfo.rows);
672
    pQueryHandle->realNumOfRows = binfo.rows;
673

674 675 676 677 678 679 680 681
    cur->rows = binfo.rows;
    cur->win  = binfo.window;
    cur->mixBlock = false;
    cur->blockCompleted = true;
    cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
  }
}

682 683 684
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
  SQueryFilePos* cur = &pQueryHandle->cur;

685
  if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
H
hjxilinx 已提交
686
    // query ended in current block
687
    if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
688 689 690
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
691

692
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
H
Haojun Liao 已提交
693
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
694
      
695 696
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
H
Haojun Liao 已提交
697
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
698 699 700
      } else {
        cur->pos = 0;
      }
701
      
H
Haojun Liao 已提交
702
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
703
    } else {  // the whole block is loaded in to buffer
704
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
705
    }
706
  } else {  //desc order, query ended in current block
707
    if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
708 709 710
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
711 712
  
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
713
      if (pCheckInfo->lastKey < pBlock->keyLast) {
714
        cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
715
      } else {
H
Haojun Liao 已提交
716
        cur->pos = pBlock->numOfRows - 1;
717 718
      }
      
H
Haojun Liao 已提交
719
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
720
    } else {
721
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
722
    }
723
  }
724

H
[td-32]  
hjxilinx 已提交
725 726 727
  return pQueryHandle->realNumOfRows > 0;
}

728
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
729
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
730
  int    numOfRows;
731 732
  TSKEY* keyList;

733 734
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
  
735
  if (num <= 0) return -1;
736 737

  keyList = (TSKEY*)pValue;
738 739
  firstPos = 0;
  lastPos = num - 1;
740

741
  if (order == TSDB_ORDER_DESC) {
742 743 744 745 746
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
747

H
Haojun Liao 已提交
748 749
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
750

751 752 753 754 755 756 757 758
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
759

760 761 762 763 764
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
765

766 767 768 769 770 771 772
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
773

H
Haojun Liao 已提交
774 775
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
776

777 778 779 780 781 782 783 784 785
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
786

787 788 789
  return midPos;
}

H
Haojun Liao 已提交
790
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
791
  char* pData = NULL;
792
  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
793
  
794 795 796
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
  TSKEY* tsArray = pCols->cols[0].pData;
  
797
  int32_t num = end - start + 1;
798
  int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
799 800
  
  //data in buffer has greater timestamp, copy data in file block
801 802
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
803
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
804 805 806 807 808 809 810 811 812

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

    int32_t bytes = pColInfo->info.bytes;

813
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
814 815
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
816
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
817
    }
818 819 820 821 822 823 824 825 826 827 828 829 830

    if (pColInfo->info.colId == src->colId) {

      if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
        memmove(pData, src->pData + bytes * start, bytes * num);
      } else {  // handle the var-string
        char* dst = pData;

        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
          char* p = tdGetColDataOfRow(src, k);
          memcpy(dst, p, varDataTLen(p));
          dst += bytes;
831 832
        }
      }
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        char* dst = pData;

        for(int32_t k = start; k < num + start; ++k) {
          setVardataNull(dst, pColInfo->info.type);
          dst += bytes;
        }
      } else {
        setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
      }
      i++;
848 849
    }
  }
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867

  while (i < requiredNumOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
    }

    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
      char* dst = pData;

      for(int32_t k = start; k < num + start; ++k) {
        setVardataNull(dst, pColInfo->info.type);
        dst += pColInfo->info.bytes;
      }
    } else {
      setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
868
    }
869 870

    i++;
871 872
  }
  
873 874
  pQueryHandle->cur.win.ekey = tsArray[end];
  pQueryHandle->cur.lastKey = tsArray[end] + step;
875
  
876
  return numOfRows + num;
877 878
}

879 880
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
                              STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
881
  char* pData = NULL;
882 883

  // the schema version info is embeded in SDataRow
H
TD-353  
Hongze Cheng 已提交
884
  STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
885 886 887 888
  int32_t numOfRowCols = schemaNCols(pSchema);

  int32_t i = 0, j = 0;
  while(i < numOfCols && j < numOfRowCols) {
889
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
890 891 892 893 894
    if (pSchema->columns[j].colId < pColInfo->info.colId) {
      j++;
      continue;
    }

895
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
896 897 898 899
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915

    if (pSchema->columns[j].colId == pColInfo->info.colId) {
      void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        memcpy(pData, value, varDataTLen(value));
      } else {
        memcpy(pData, value, pColInfo->info.bytes);
      }

      j++;
      i++;
    } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        setVardataNull(pData, pColInfo->info.type);
      } else {
        setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
916
      }
917
      i++;
918
    }
919 920 921 922 923 924 925 926 927 928
  }

  while (i < numOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }

929
    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
930
      setVardataNull(pData, pColInfo->info.type);
931
    } else {
932
      setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
933
    }
934 935

    i++;
936 937 938
  }
}

H
[td-32]  
hjxilinx 已提交
939 940
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
941
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
942 943
                                  SArray* sa) {
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
944
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
945
  
946
  initTableMemIterator(pQueryHandle, pCheckInfo);
947
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
948 949 950 951 952 953 954 955 956 957

  // for search the endPos, so the order needs to reverse
  int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;

  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);

  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

958
  int32_t endPos = cur->pos;
959
  if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
960
    endPos = blockInfo.rows - 1;
961
    cur->mixBlock = (cur->pos != 0);
962
  } else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
963
    endPos = 0;
964
    cur->mixBlock = (cur->pos != blockInfo.rows - 1);
965
  } else {
966 967
    assert(pCols->numOfRows > 0);
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
968
    cur->mixBlock = true;
969
  }
H
hjxilinx 已提交
970
  
971
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
972
  int32_t pos = cur->pos;
H
hjxilinx 已提交
973
  
974 975
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
  TSKEY* tsArray = pCols->cols[0].pData;
H
hjxilinx 已提交
976
  
977 978
  int32_t numOfRows = 0;
  pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
979

980 981
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
982 983
    int32_t start = cur->pos;
    int32_t end = endPos;
984
    if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
985 986 987 988 989 990 991 992
      end = cur->pos;
      start = endPos;
    }
    
    cur->win.skey = tsArray[start];
    cur->win.ekey = tsArray[end];
    
    // todo opt in case of no data in buffer
H
Haojun Liao 已提交
993
    numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
994
    
995
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
996
    if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
997
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
998 999

      for(int32_t i = 0; i < numOfCols; ++i) {
1000 1001 1002 1003 1004
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  
H
Haojun Liao 已提交
1005
    pos += (end - start + 1) * step;
1006 1007
    cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
        ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
H
Haojun Liao 已提交
1008
    
1009 1010 1011
    pCheckInfo->lastKey = cur->lastKey;
    pQueryHandle->realNumOfRows = numOfRows;
    cur->rows = numOfRows;
1012
    return;
1013
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1014 1015
    SSkipListNode* node = NULL;
    do {
1016 1017
      SDataRow row = getSDataRowInTableMem(pCheckInfo);
      if (row == NULL) {
H
[td-32]  
hjxilinx 已提交
1018
        break;
1019
      }
1020

1021
      TSKEY key = dataRowKey(row);
1022 1023
      if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1024 1025 1026
        break;
      }

1027 1028
      if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1029 1030 1031
        break;
      }

1032 1033
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1034
        copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
1035 1036 1037 1038
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1039

1040
        cur->win.ekey = key;
1041 1042 1043
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1044
        moveToNextRow(pCheckInfo);
1045
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
1046
        moveToNextRow(pCheckInfo);
1047 1048
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1049 1050 1051
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1052

1053
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
H
Haojun Liao 已提交
1054
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
H
Haojun Liao 已提交
1055
          moveToNextRow(pCheckInfo);
H
Haojun Liao 已提交
1056 1057
        }
        
1058
        int32_t start = -1;
1059
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
          int32_t remain = end - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
        } else {
          int32_t remain = (pos - end) + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = end;
          end = pos;
        }

H
Haojun Liao 已提交
1076
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1077 1078 1079 1080 1081
        pos += (end - start + 1) * step;
      }
    } while (numOfRows < pQueryHandle->outputCapacity);
    
    if (numOfRows < pQueryHandle->outputCapacity) {
H
Haojun Liao 已提交
1082 1083 1084 1085
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1086
      if (node == NULL ||
1087 1088
          ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1089 1090 1091 1092 1093 1094 1095 1096 1097
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

        int32_t start = -1;
        int32_t end = -1;

        // all remain data are qualified, but check the remain capacity in the first place.
1098
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
          int32_t remain = endPos - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
          end = endPos;
        } else {
          int32_t remain = pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = endPos;
          end = pos;
        }

H
Haojun Liao 已提交
1116
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1117
        pos += (end - start + 1) * step;
1118
      }
1119 1120
    }
  }
1121
  
1122 1123
  cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
      ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
1124

1125
  if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
1126 1127 1128 1129 1130
    SWAP(cur->win.skey, cur->win.ekey, TSKEY);
  
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
    if (numOfRows < pQueryHandle->outputCapacity) {
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
1131
      for(int32_t i = 0; i < numOfCols; ++i) {
1132 1133 1134 1135 1136 1137
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  }
  
1138 1139 1140 1141
  pCheckInfo->lastKey = cur->lastKey;
  pQueryHandle->realNumOfRows = numOfRows;
  cur->rows = numOfRows;
  cur->pos = pos;
1142

S
Shengliang Guan 已提交
1143
  tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
1144
      cur->win.ekey, cur->rows, pQueryHandle->qinfo);
1145 1146
}

1147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
1148
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1149
  int    numOfRows;
1150 1151
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
1152
  if (num <= 0) return -1;
1153 1154

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
1155 1156
  firstPos = 0;
  lastPos = num - 1;
1157

1158
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
1159 1160 1161 1162 1163
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1164

H
Haojun Liao 已提交
1165 1166
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1167

H
[td-32]  
hjxilinx 已提交
1168 1169 1170 1171 1172 1173 1174 1175
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1176

H
[td-32]  
hjxilinx 已提交
1177 1178 1179 1180 1181
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1182

H
[td-32]  
hjxilinx 已提交
1183 1184 1185 1186 1187 1188 1189
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1190

H
Haojun Liao 已提交
1191 1192
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1193

H
[td-32]  
hjxilinx 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1203

H
[td-32]  
hjxilinx 已提交
1204 1205 1206
  return midPos;
}

1207
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
1208
  tfree(pSupporter->numOfBlocksPerTable);
1209 1210 1211
  tfree(pSupporter->blockIndexArray);

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
1212 1213
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
    tfree(pBlockInfo);
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
  }

  tfree(pSupporter->pDataBlockInfo);
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

1228
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
1229 1230
    /* left block is empty */
    return 1;
1231
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
1232 1233 1234 1235 1236 1237 1238
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
1239 1240 1241
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
1242
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
1243 1244
  }

H
Haojun Liao 已提交
1245
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
1246 1247
}

1248
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

  if (pQueryHandle->allocSize < size) {
    pQueryHandle->allocSize = size;
    char* tmp = realloc(pQueryHandle->pDataBlockInfo, pQueryHandle->allocSize);
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

    pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
1259 1260
  }

H
Haojun Liao 已提交
1261
  memset(pQueryHandle->pDataBlockInfo, 0, size);
1262 1263 1264
  *numOfAllocBlocks = numOfBlocks;

  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1265

1266 1267
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
1268
  sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
1269 1270 1271
  sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);

1272
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
1273
    cleanBlockOrderSupporter(&sup, 0);
1274
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1275
  }
H
Haojun Liao 已提交
1276
  
1277
  int32_t cnt = 0;
1278
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
1279
  
1280 1281
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
1282 1283 1284 1285
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
    
1286
    SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
1287
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
1288 1289 1290

    char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
    if (buf == NULL) {
1291
      cleanBlockOrderSupporter(&sup, numOfQualTables);
1292
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
1293 1294
    }

1295
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
1296 1297

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
1298
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
1299

H
Haojun Liao 已提交
1300 1301
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
1302 1303 1304
      cnt++;
    }

1305
    numOfQualTables++;
1306 1307
  }

H
Haojun Liao 已提交
1308
  assert(numOfBlocks == cnt);
1309

H
Haojun Liao 已提交
1310 1311 1312 1313
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
    memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
    cleanBlockOrderSupporter(&sup, numOfQualTables);
1314

S
Shengliang Guan 已提交
1315
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
H
Haojun Liao 已提交
1316 1317 1318
        pQueryHandle->qinfo);
    return TSDB_CODE_SUCCESS;
  }
1319

S
Shengliang Guan 已提交
1320
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
H
Haojun Liao 已提交
1321
      numOfQualTables, pQueryHandle->qinfo);
1322

1323
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
1324
  sup.numOfTables = numOfQualTables;
1325

H
Haojun Liao 已提交
1326
  SLoserTreeInfo* pTree = NULL;
1327 1328 1329
  uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
1330
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1331 1332 1333 1334 1335 1336 1337 1338
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
    int32_t pos = pTree->pNode[0].index;
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
1339 1340
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
    pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
1341 1342

    // set data block index overflow, in order to disable the offset comparator
1343 1344
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
1345
    }
1346 1347 1348 1349 1350 1351 1352

    tLoserTreeAdjust(pTree, pos + sup.numOfTables);
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
1353
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
1354 1355 1356
   * }
   */

S
Shengliang Guan 已提交
1357
  tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt);
1358 1359 1360 1361 1362 1363
  cleanBlockOrderSupporter(&sup, numOfTables);
  free(pTree);

  return TSDB_CODE_SUCCESS;
}

1364
// todo opt for only one table case
H
Haojun Liao 已提交
1365
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
1366 1367
  pQueryHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
1368 1369 1370

  int32_t code = TSDB_CODE_SUCCESS;

1371 1372 1373 1374
  int32_t numOfBlocks = 0;
  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
1375
    int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
H
Haojun Liao 已提交
1376
    if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
1377 1378 1379
      break;
    }
    
S
Shengliang Guan 已提交
1380
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
H
Haojun Liao 已提交
1381
           numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
1382
    
1383 1384 1385 1386 1387
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
    
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
    // todo return error code to query engine
    if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
      break;
    }
    
    assert(numOfBlocks >= pQueryHandle->numOfBlocks);
    if (pQueryHandle->numOfBlocks > 0) {
      break;
    }
  }
  
  // no data in file anymore
  if (pQueryHandle->numOfBlocks <= 0) {
H
Haojun Liao 已提交
1401 1402 1403 1404
    if (code == TSDB_CODE_SUCCESS) {
      assert(pQueryHandle->pFileGroup == NULL);
    }

1405
    cur->fid = -1;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
1406 1407
    *exists = false;
    return code;
1408 1409
  }
  
1410
  cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
1411 1412 1413
  cur->fid = pQueryHandle->pFileGroup->fileId;
  
  STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1414 1415 1416
  *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);

  return TSDB_CODE_SUCCESS;
1417 1418
}

H
Haojun Liao 已提交
1419
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
1420 1421 1422 1423 1424 1425
  STsdbFileH*    pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
  SQueryFilePos* cur = &pQueryHandle->cur;

  // find the start data block in file
  if (!pQueryHandle->locateStart) {
    pQueryHandle->locateStart = true;
1426 1427
    STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
    int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
1428 1429
    
    tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
1430 1431
    tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);

H
Haojun Liao 已提交
1432
    return getDataBlocksInFilesImpl(pQueryHandle, exists);
1433
  } else {
1434 1435 1436 1437 1438
    // check if current file block is all consumed
    STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
    
    // current block is done, try next
1439
    if (!cur->mixBlock || cur->blockCompleted) {
1440 1441
      if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1442
        // all data blocks in current file has been checked already, try next file if exists
H
Haojun Liao 已提交
1443
        return getDataBlocksInFilesImpl(pQueryHandle, exists);
H
Haojun Liao 已提交
1444 1445
      } else {
        // next block of the same file
1446
        int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
1447
        cur->slot += step;
1448 1449 1450 1451
        
        cur->mixBlock = false;
        cur->blockCompleted = false;
        
1452
        STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1453 1454 1455
        *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);

        return TSDB_CODE_SUCCESS;
1456
      }
1457
    } else {
H
Haojun Liao 已提交
1458
      handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
H
Haojun Liao 已提交
1459 1460 1461
      *exists = pQueryHandle->realNumOfRows > 0;

      return TSDB_CODE_SUCCESS;
1462 1463
    }
  }
1464 1465
}

1466 1467
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1468
  assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
1469
  
1470 1471
  while (pQueryHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pQueryHandle)) {
1472 1473 1474
      return true;
    }
    
1475 1476 1477 1478 1479 1480 1481
    pQueryHandle->activeIndex += 1;
  }
  
  return false;
}

// handle data in cache situation
1482 1483
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
1484 1485 1486
  
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  assert(numOfTables > 0);
H
Haojun Liao 已提交
1487

H
Haojun Liao 已提交
1488
  SDataBlockInfo blockInfo = {{0}, 0};
1489 1490 1491 1492 1493 1494 1495 1496
  if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
    pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
    pQueryHandle->order = TSDB_ORDER_DESC;
    
    if (!tsdbNextDataBlock(pHandle)) {
      return false;
    }
    
H
Haojun Liao 已提交
1497
    /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
H
Haojun Liao 已提交
1498
    /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
  
    if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
      // data already retrieve, discard other data rows and return
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
      }
  
      pQueryHandle->cur.win  = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 1;
      pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
      return true;
    } else {
      STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
      pSecQueryHandle->order       = TSDB_ORDER_ASC;
      pSecQueryHandle->window      = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
      pSecQueryHandle->pTsdb       = pQueryHandle->pTsdb;
      pSecQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
      pSecQueryHandle->cur.fid     = -1;
      pSecQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
      pSecQueryHandle->checkFiles  = true;
      pSecQueryHandle->activeIndex = 0;
      pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
  
      tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
H
Haojun Liao 已提交
1526 1527
      tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);

1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
      // allocate buffer in order to load data blocks from file
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  
      pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
      pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData colInfo = {{0}, 0};
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        
        colInfo.info = pCol->info;
        colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
        taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
        pSecQueryHandle->statis[i].colId = colInfo.info.colId;
      }
  
      size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
      pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
      STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
      assert(pMeta != NULL);
  
      for (int32_t j = 0; j < si; ++j) {
        STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
    
        STableCheckInfo info = {
            .lastKey = pSecQueryHandle->window.skey,
            .tableId = pCheckInfo->tableId,
            .pTableObj = pCheckInfo->pTableObj,
        };
        
        taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
      }
  
      tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
      tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
  
      bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
      assert(ret);

H
Haojun Liao 已提交
1567
      /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
H
Haojun Liao 已提交
1568
      /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
  
        SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
        assert(pCol->info.colId == pCol1->info.colId);
        
        memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
      }
  
      SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
      
      pQueryHandle->cur.win  = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 2;
      
      tsdbCleanupQueryHandle(pSecQueryHandle);
    }
    
    pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
    return true;
  }
  
1593
  if (pQueryHandle->checkFiles) {
H
Haojun Liao 已提交
1594 1595 1596 1597 1598 1599 1600 1601
    bool exists = true;
    int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }

    if (exists) {
      return exists;
1602
    }
1603 1604 1605 1606 1607
  
    pQueryHandle->activeIndex = 0;
    pQueryHandle->checkFiles  = false;
  }
  
H
Haojun Liao 已提交
1608 1609
  // TODO: opt by consider the scan order
  return doHasDataInBuffer(pQueryHandle);
1610 1611
}

1612
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
H
hjxilinx 已提交
1613
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
1614
  assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
H
hjxilinx 已提交
1615 1616 1617 1618 1619 1620
  
  // starts from the buffer in case of descending timestamp order check data blocks
  
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
1621
  TSKEY key = TSKEY_INITIAL_VAL;
H
hjxilinx 已提交
1622 1623 1624 1625
  int32_t index = -1;
  
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
1626
    if (pCheckInfo->pTableObj->lastKey > key) {
H
hjxilinx 已提交
1627 1628 1629 1630 1631
      key = pCheckInfo->pTableObj->lastKey;
      index = i;
    }
  }
  
1632
  if (index == -1) {
1633
    // todo add failure test cases
1634 1635 1636
    return;
  }
  
1637
  // erase all other elements in array list
H
hjxilinx 已提交
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    if (i == index) {
      continue;
    }
    
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    tSkipListDestroyIter(pTableCheckInfo->iter);
    
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
    
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
1656
  taosArrayClear(pQueryHandle->pTableCheckInfo);
H
Haojun Liao 已提交
1657 1658
  
  info.lastKey = key;
H
hjxilinx 已提交
1659 1660 1661 1662 1663 1664
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {key, key};
}

1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
  // filter the queried time stamp in the first place
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
  pQueryHandle->order = TSDB_ORDER_DESC;
  
  assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
  
  // starts from the buffer in case of descending timestamp order check data blocks
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  int32_t i = 0;
  while(i < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
        pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
      break;
    }
    
    i++;
  }
  
  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
  taosArrayClear(pQueryHandle->pTableCheckInfo);
  
  info.lastKey = pQueryHandle->window.skey;
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}

H
Haojun Liao 已提交
1702
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
1703
                                 STsdbQueryHandle* pQueryHandle) {
1704
  int     numOfRows = 0;
1705
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
H
Haojun Liao 已提交
1706
  win->skey = TSKEY_INITIAL_VAL;
1707

H
Haojun Liao 已提交
1708
  int64_t st = taosGetTimestampUs();
1709 1710 1711
  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

1712
  do {
H
Haojun Liao 已提交
1713 1714
    SDataRow row = getSDataRowInTableMem(pCheckInfo);
    if (row == NULL) {
1715 1716
      break;
    }
1717

1718
    TSKEY key = dataRowKey(row);
1719
    if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
S
Shengliang Guan 已提交
1720
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
1721 1722 1723 1724
          pQueryHandle->window.ekey);
      
      break;
    }
1725

H
Haojun Liao 已提交
1726 1727
    if (win->skey == INT64_MIN) {
      win->skey = key;
1728
    }
1729

H
Haojun Liao 已提交
1730
    win->ekey = key;
1731 1732
    copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);

1733
    if (++numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
1734
      moveToNextRow(pCheckInfo);
1735 1736 1737
      break;
    }
    
H
Haojun Liao 已提交
1738
  } while(moveToNextRow(pCheckInfo));
1739

1740 1741 1742
  assert(numOfRows <= maxRowsToRead);
  
  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1743
  if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
1744 1745 1746 1747 1748 1749 1750 1751
    int32_t emptySize = maxRowsToRead - numOfRows;
    
    for(int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
      memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }
  
H
Haojun Liao 已提交
1752
  int64_t elapsedTime = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
1753
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
H
Haojun Liao 已提交
1754
            elapsedTime, numOfRows, numOfCols);
1755

1756
  return numOfRows;
H
hjxilinx 已提交
1757 1758
}

H
Haojun Liao 已提交
1759
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* pDataBlockInfo) {
1760
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
1761
  SQueryFilePos* cur = &pHandle->cur;
1762
  STable* pTable = NULL;
1763
  
1764
  // there are data in file
1765
  if (pHandle->cur.fid >= 0) {
1766 1767
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
    pTable = pBlockInfo->pTableCheckInfo->pTableObj;
H
[td-32]  
hjxilinx 已提交
1768
  } else {
1769
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
1770
    pTable = pCheckInfo->pTableObj;
1771
  }
1772

H
Haojun Liao 已提交
1773 1774 1775 1776 1777
  pDataBlockInfo->uid = pTable->tableId.uid;
  pDataBlockInfo->tid = pTable->tableId.tid;
  pDataBlockInfo->rows = cur->rows;
  pDataBlockInfo->window = cur->win;
  pDataBlockInfo->numOfCols = QH_GET_NUM_OF_COLS(pHandle);
1778
}
H
hjxilinx 已提交
1779

H
Haojun Liao 已提交
1780 1781 1782
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
H
hzcheng 已提交
1783
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
H
Haojun Liao 已提交
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
  
  SQueryFilePos* cur = &pHandle->cur;
  if (cur->mixBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
  assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
      ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
  
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1796 1797 1798 1799 1800 1801 1802
  
  // file block with subblocks has no statistics data
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
H
Haojun Liao 已提交
1803
  tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
H
Haojun Liao 已提交
1804 1805

  // todo opt perf
H
Haojun Liao 已提交
1806
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
1807 1808 1809 1810 1811 1812 1813 1814
  for(int32_t i = 0; i < numOfCols; ++i) {
    SDataStatis* st = &pHandle->statis[i];
    int32_t colId = st->colId;
    
    memset(st, 0, sizeof(SDataStatis));
    st->colId = colId;
  }
  
H
Haojun Liao 已提交
1815 1816
  tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
  
H
Haojun Liao 已提交
1817 1818
  *pBlockStatis = pHandle->statis;
  
H
Haojun Liao 已提交
1819 1820
  //update the number of NULL data rows
  for(int32_t i = 0; i < numOfCols; ++i) {
1821
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
1822 1823
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
H
Haojun Liao 已提交
1824 1825 1826 1827 1828 1829 1830

    // todo opt perf
    SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
    if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
      pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
      pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
    }
H
Haojun Liao 已提交
1831 1832
  }
  
1833
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1834 1835
}

H
hzcheng 已提交
1836
SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
1837
  /**
H
hjxilinx 已提交
1838
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
1839 1840
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
1841 1842
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;

H
[td-32]  
hjxilinx 已提交
1843 1844 1845
  if (pHandle->cur.fid < 0) {
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
1846 1847
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
1848

1849
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
1850 1851
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
1852
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
1853 1854
      assert(pHandle->realNumOfRows <= binfo.rows);
  
H
hjxilinx 已提交
1855 1856
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
hjLiao 已提交
1857 1858 1859
      
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
          pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
H
hjxilinx 已提交
1860
        return pHandle->pColumns;
H
Haojun Liao 已提交
1861
      } else {  // only load the file block
H
Haojun Liao 已提交
1862
        SCompBlock* pBlock = pBlockInfo->compBlock;
1863
        doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
H
Haojun Liao 已提交
1864 1865
  
        // todo refactor
H
Haojun Liao 已提交
1866
        int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
1867 1868
  
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1869
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
1870 1871 1872 1873 1874 1875 1876 1877 1878
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
          int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);
    
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
            memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
          }
        }
        
H
hjxilinx 已提交
1879 1880
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
1881 1882
    }
  }
H
hjxilinx 已提交
1883 1884
}

H
hzcheng 已提交
1885
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
1886

1887
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
H
hjxilinx 已提交
1888
  SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
1889
  while (tSkipListIterNext(iter)) {
H
hjxilinx 已提交
1890
    SSkipListNode* pNode = tSkipListIterGet(iter);
1891
    
H
Haojun Liao 已提交
1892
    STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
1893
    taosArrayPush(list, pTable);
1894
  }
1895
  
1896
  tSkipListDestroyIter(iter);
1897
  return TSDB_CODE_SUCCESS;
1898 1899
}

1900
static void destroyHelper(void* param) {
1901 1902 1903
  if (param == NULL) {
    return;
  }
1904

H
hjxilinx 已提交
1905
  
1906
  tQueryInfo* pInfo = (tQueryInfo*)param;
H
hjxilinx 已提交
1907 1908 1909 1910 1911
  if (pInfo->optr != TSDB_RELATION_IN) {
    tfree(pInfo->q);
  }
  
//  tVariantDestroy(&(pInfo->q));
1912 1913 1914
  free(param);
}

H
hjxilinx 已提交
1915
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
1916 1917
  // filter on table name(TBNAME)
  if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
H
hjxilinx 已提交
1918
    return TSDB_TBNAME_COLUMN_INDEX;
1919
  }
H
hjxilinx 已提交
1920 1921 1922 1923
  
  for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
    STColumn* pColumn = &pTSchema->columns[i];
    if (pColumn->bytes == pSchema->bytes && pColumn->type  == pSchema->type  && pColumn->colId == pSchema->colId) {
H
hjxilinx 已提交
1924
      return i;
1925 1926
    }
  }
H
hjxilinx 已提交
1927 1928
  
  return -2;
1929 1930 1931
}

void filterPrepare(void* expr, void* param) {
1932
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
1933
  if (pExpr->_node.info != NULL) {
1934 1935
    return;
  }
1936

H
hjxilinx 已提交
1937
  int32_t i = 0;
H
[td-32]  
hjxilinx 已提交
1938
  pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
H
hjxilinx 已提交
1939 1940
  
  STSchema* pTSSchema = (STSchema*) param;
1941

H
hjxilinx 已提交
1942 1943 1944
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
1945

1946
  // todo : if current super table does not change schema yet, this function may fail to get correct schema, test case
H
hjxilinx 已提交
1947
  int32_t index = getTagColumnIndex(pTSSchema, pSchema);
H
hjxilinx 已提交
1948
  assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX));
1949

1950
  pInfo->sch      = *pSchema;
H
hjxilinx 已提交
1951
  pInfo->colIndex = index;
1952
  pInfo->optr     = pExpr->_node.optr;
H
hjxilinx 已提交
1953
  pInfo->compare  = getComparFunc(pSchema->type, pInfo->optr);
H
hjxilinx 已提交
1954
  pInfo->param    = pTSSchema;
H
hjxilinx 已提交
1955 1956 1957 1958 1959
  
  if (pInfo->optr == TSDB_RELATION_IN) {
    pInfo->q = (char*) pCond->arr;
  } else {
    pInfo->q = calloc(1, pSchema->bytes);
1960
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
1961
  }
1962 1963
}

1964 1965 1966 1967
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
  STSchema*  pTagSchema;
1968
//  void*      tsdbMeta;
1969 1970 1971 1972
} STableGroupSupporter;

int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
1973 1974
  STable* pTable1 = *(STable**) p1;
  STable* pTable2 = *(STable**) p2;
1975 1976 1977 1978 1979
  
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
    
H
Haojun Liao 已提交
1980
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
1981
    
1982 1983 1984 1985 1986
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
    
H
Haojun Liao 已提交
1987 1988 1989
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
1990
      type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
1991
      bytes = tGetTableNameColumnSchema().bytes;
1992
    } else {
H
hjxilinx 已提交
1993 1994
      STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
      bytes = pCol->bytes;
1995
      type = pCol->type;
H
Hongze Cheng 已提交
1996 1997
      f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
      f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
1998
    }
H
Haojun Liao 已提交
1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  
  return 0;
}

2024
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
H
hjxilinx 已提交
2025
    __ext_compar_fn_t compareFn) {
2026
  STable* pTable = taosArrayGetP(pTableList, 0);
2027
  
2028 2029 2030 2031
  SArray* g = taosArrayInit(16, POINTER_BYTES);
  taosArrayPush(g, &pTable);
  tsdbRefTable(pTable);

2032
  for (int32_t i = 1; i < numOfTables; ++i) {
2033 2034
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
hjxilinx 已提交
2035 2036
    
    int32_t ret = compareFn(prev, p, pSupp);
2037 2038
    assert(ret == 0 || ret == -1);
    
2039 2040 2041
    tsdbRefTable(*p);
    assert((*p)->type == TSDB_CHILD_TABLE);

2042
    if (ret == 0) {
H
hjxilinx 已提交
2043
      taosArrayPush(g, p);
2044 2045
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
2046
      g = taosArrayInit(16, POINTER_BYTES);
H
hjxilinx 已提交
2047
      taosArrayPush(g, p);
2048 2049
    }
  }
2050 2051
  
  taosArrayPush(pGroups, &g);
2052 2053
}

2054
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
2055
  assert(pTableList != NULL);
2056 2057 2058 2059
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
  
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
2060
    tsdbDebug("no qualified tables");
2061 2062 2063 2064
    return pTableGroup;
  }
  
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
2065
    SArray* sa = taosArrayInit(size, POINTER_BYTES);
weixin_48148422's avatar
weixin_48148422 已提交
2066
    for(int32_t i = 0; i < size; ++i) {
2067 2068 2069 2070 2071
      STable** pTable = taosArrayGet(pTableList, i);
      assert((*pTable)->type == TSDB_CHILD_TABLE);

      tsdbRefTable(*pTable);
      taosArrayPush(sa, pTable);
2072 2073 2074
    }
    
    taosArrayPush(pTableGroup, &sa);
S
Shengliang Guan 已提交
2075
    tsdbDebug("all %zu tables belong to one group", size);
2076 2077 2078 2079 2080 2081
  } else {
    STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
    pSupp->numOfCols = numOfOrderCols;
    pSupp->pTagSchema = pTagSchema;
    pSupp->pCols = pCols;
    
2082
    taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
H
hjxilinx 已提交
2083
    createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
2084 2085 2086 2087 2088 2089
    tfree(pSupp);
  }
  
  return pTableGroup;
}

2090
bool indexedNodeFilterFp(const void* pNode, void* param) {
H
hjxilinx 已提交
2091
  tQueryInfo* pInfo = (tQueryInfo*) param;
H
hjxilinx 已提交
2092
  
H
Haojun Liao 已提交
2093
  STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
2094

2095
  char*  val = NULL;
H
hjxilinx 已提交
2096

2097
  if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
H
Haojun Liao 已提交
2098
    val = (char*) TABLE_NAME(pTable);
2099
  } else {
H
Haojun Liao 已提交
2100
    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
2101
  }
2102
  
H
Haojun Liao 已提交
2103 2104 2105 2106 2107 2108
  int32_t ret = 0;
  if (val == NULL) { //the val is possible to be null, so check it out carefully
    ret = -1; // val is missing in table tags value pairs
  } else {
    ret = pInfo->compare(val, pInfo->q);
  }
2109

2110 2111 2112 2113 2114 2115 2116
  switch (pInfo->optr) {
    case TSDB_RELATION_EQUAL: {
      return ret == 0;
    }
    case TSDB_RELATION_NOT_EQUAL: {
      return ret != 0;
    }
2117
    case TSDB_RELATION_GREATER_EQUAL: {
2118 2119
      return ret >= 0;
    }
2120
    case TSDB_RELATION_GREATER: {
2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
      return ret > 0;
    }
    case TSDB_RELATION_LESS_EQUAL: {
      return ret <= 0;
    }
    case TSDB_RELATION_LESS: {
      return ret < 0;
    }
    case TSDB_RELATION_LIKE: {
      return ret == 0;
    }
weixin_48148422's avatar
weixin_48148422 已提交
2132
    case TSDB_RELATION_IN: {
2133
      return ret == 1;
weixin_48148422's avatar
weixin_48148422 已提交
2134
    }
2135

2136 2137 2138
    default:
      assert(false);
  }
H
hjxilinx 已提交
2139
  
2140 2141 2142
  return true;
}

2143
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
H
hjxilinx 已提交
2144 2145
  // query according to the expression tree
  SExprTraverseSupp supp = {
2146
      .nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
H
hjxilinx 已提交
2147 2148
      .setupInfoFn = filterPrepare,
      .pExtInfo = pSTable->tagSchema,
2149
      };
2150

2151
  tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
2152
  tExprTreeDestroy(&pExpr, destroyHelper);
2153 2154 2155
  return TSDB_CODE_SUCCESS;
}

H
TD-353  
Hongze Cheng 已提交
2156
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
2157 2158
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
                                 SColIndex* pColIndex, int32_t numOfCols) {
2159 2160
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

H
hjxilinx 已提交
2161 2162
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2163
    tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
2164 2165 2166 2167
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    tsdbUnlockRepoMeta(tsdb);

    goto _error;
2168 2169
  }
  
H
hjxilinx 已提交
2170
  if (pTable->type != TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2171
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
B
Bomin Zhang 已提交
2172
        pTable->name->data);
2173 2174 2175 2176
    terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client

    tsdbUnlockRepoMeta(tsdb);
    goto _error;
H
hjxilinx 已提交
2177
  }
2178 2179 2180

  //NOTE: not add ref count for super table
  SArray* res = taosArrayInit(8, POINTER_BYTES);
H
TD-353  
Hongze Cheng 已提交
2181
  STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
2182
  
weixin_48148422's avatar
weixin_48148422 已提交
2183 2184
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
2185 2186 2187 2188
    int32_t ret = getAllTableList(pTable, res);
    if (ret != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepoMeta(tsdb);
      goto _error;
2189
    }
2190 2191 2192 2193

    pGroupInfo->numOfTables = taosArrayGetSize(res);
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
      
S
Shengliang Guan 已提交
2194
    tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
2195
    taosArrayDestroy(res);
2196 2197

    if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2198 2199
    return ret;
  }
2200

H
hjxilinx 已提交
2201
  int32_t ret = TSDB_CODE_SUCCESS;
2202
  tExprNode* expr = NULL;
2203

2204 2205
  TRY(32) {
    expr = exprTreeFromTableName(tbnameCond);
weixin_48148422's avatar
weixin_48148422 已提交
2206
    if (expr == NULL) {
2207
      expr = exprTreeFromBinary(pTagCond, len);
weixin_48148422's avatar
weixin_48148422 已提交
2208
    } else {
2209 2210 2211 2212 2213 2214 2215
      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL);
      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
      if (tagExpr != NULL) {
        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL);
        tExprNode* tbnameExpr = expr;
        expr = calloc(1, sizeof(tExprNode));
        if (expr == NULL) {
2216
          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
2217 2218 2219 2220 2221 2222
        }
        expr->nodeType = TSQL_NODE_EXPR;
        expr->_node.optr = tagNameRelType;
        expr->_node.pLeft = tagExpr;
        expr->_node.pRight = tbnameExpr;
      }
weixin_48148422's avatar
weixin_48148422 已提交
2223
    }
2224 2225 2226 2227
    CLEANUP_EXECUTE();

  } CATCH( code ) {
    CLEANUP_EXECUTE();
dengyihao's avatar
TD-816  
dengyihao 已提交
2228 2229
    terrno = code;
    goto _error; 
2230 2231
    // TODO: more error handling
  } END_TRY
2232

H
hjxilinx 已提交
2233
  doQueryTableList(pTable, res, expr);
2234
  pGroupInfo->numOfTables = taosArrayGetSize(res);
2235
  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
2236

S
Shengliang Guan 已提交
2237
  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
H
Haojun Liao 已提交
2238 2239
      pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));

2240
  taosArrayDestroy(res);
2241 2242

  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2243
  return ret;
2244 2245 2246

  _error:
  return terrno;
2247
}
2248

H
TD-353  
Hongze Cheng 已提交
2249
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
2250 2251
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

2252 2253
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2254
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
H
Hongze Cheng 已提交
2255
    tsdbUnlockRepoMeta(tsdb);
2256
    goto _error;
2257
  }
2258

B
Bomin Zhang 已提交
2259
  assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
2260 2261 2262
  tsdbRefTable(pTable);
  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;

2263 2264
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
2265
  
2266
  SArray* group = taosArrayInit(1, POINTER_BYTES);
2267
  
2268
  taosArrayPush(group, &pTable);
2269
  taosArrayPush(pGroupInfo->pGroupList, &group);
2270 2271
  
  return TSDB_CODE_SUCCESS;
2272 2273 2274

  _error:
  return terrno;
2275
}
2276

2277
int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
2278 2279 2280
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
  SArray* group = taosArrayInit(1, POINTER_BYTES);

  int32_t i = 0;
  for(; i < size; ++i) {
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
    }

    tsdbRefTable(pTable);
    taosArrayPush(group, &pTable);
  }

B
Bomin Zhang 已提交
2306 2307 2308 2309
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
2310 2311 2312 2313 2314 2315 2316

  pGroupInfo->numOfTables = i;
  taosArrayPush(pGroupInfo->pGroupList, &group);

  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2317
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
2318
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
2319 2320 2321 2322
  if (pQueryHandle == NULL) {
    return;
  }
  
2323
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
2324
  for (int32_t i = 0; i < size; ++i) {
2325
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
2326
    tSkipListDestroyIter(pTableCheckInfo->iter);
H
Haojun Liao 已提交
2327

H
hjxilinx 已提交
2328 2329 2330
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
2331

2332 2333 2334
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
2335

2336
  taosArrayDestroy(pQueryHandle->pTableCheckInfo);
2337

H
hjxilinx 已提交
2338 2339 2340
   size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
   for (int32_t i = 0; i < cols; ++i) {
     SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
2341
     tfree(pColInfo->pData);
H
hjxilinx 已提交
2342
   }
2343

2344
  taosArrayDestroy(pQueryHandle->pColumns);
H
Haojun Liao 已提交
2345
  taosArrayDestroy(pQueryHandle->defaultLoadColumn);
2346
  tfree(pQueryHandle->pDataBlockInfo);
H
Haojun Liao 已提交
2347
  tfree(pQueryHandle->statis);
2348 2349 2350 2351 2352

  // todo check error
  tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
  tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);

H
TD-100  
hzcheng 已提交
2353
  tsdbDestroyHelper(&pQueryHandle->rhelper);
2354 2355
  tfree(pQueryHandle);
}
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378

void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) {
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
      assert(pTable != NULL);

      tsdbUnRefTable(pTable);
    }

    taosArrayDestroy(p);
  }

  taosArrayDestroy(pGroupList->pGroupList);
}