tsdbRead.c 79.7 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
slguan 已提交
17
#include "tulog.h"
18
#include "talgo.h"
19
#include "tutil.h"
H
Haojun Liao 已提交
20
#include "ttime.h"
21
#include "tcompare.h"
22
#include "exception.h"
23

24 25
#include "../../../query/inc/qast.h"  // todo move to common module
#include "../../../query/inc/tlosertree.h"  // todo move to util module
26
#include "tsdb.h"
H
TD-34  
hzcheng 已提交
27
#include "tsdbMain.h"
28

29
#define EXTRA_BYTES 2
30
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
31
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
32

33 34 35 36 37
enum {
  QUERY_RANGE_LESS_EQUAL = 0,
  QUERY_RANGE_GREATER_EQUAL = 1,
};

H
hjxilinx 已提交
38
enum {
39 40 41
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
  TSDB_QUERY_TYPE_EXTERNAL = 3,
H
hjxilinx 已提交
42 43
};

44 45
typedef struct SQueryFilePos {
  int32_t fid;
46 47
  int32_t slot;
  int32_t pos;
48
  int64_t lastKey;
49 50
  int32_t rows;
  bool    mixBlock;
51
  bool    blockCompleted;
52
  STimeWindow win;
53
} SQueryFilePos;
H
hjxilinx 已提交
54

55
typedef struct SDataBlockLoadInfo {
H
hjxilinx 已提交
56
  SFileGroup* fileGroup;
57
  int32_t     slot;
H
hjLiao 已提交
58
  int32_t     tid;
59
  SArray*     pLoadedCols;
60
} SDataBlockLoadInfo;
H
hjxilinx 已提交
61

62
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
63
  int32_t tid; /* table tid */
64 65
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
66

67
typedef struct STableCheckInfo {
H
Haojun Liao 已提交
68 69 70 71 72 73 74
  STableId      tableId;
  TSKEY         lastKey;
  STable*       pTableObj;
  SCompInfo*    pCompInfo;
  int32_t       compSize;
  int32_t       numOfBlocks;    // number of qualified data blocks not the original blocks
  SDataCols*    pDataCols;
H
Haojun Liao 已提交
75
  int32_t       chosen;         // indicate which iterator should move forward
H
Haojun Liao 已提交
76 77 78
  bool          initBuf;        // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
79
} STableCheckInfo;
80

81
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
82 83
  SCompBlock*        compBlock;
  STableCheckInfo*   pTableCheckInfo;
84
} STableBlockInfo;
85

86 87
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
88
  STableBlockInfo**   pDataBlockInfo;
89
  int32_t*            blockIndexArray;
90
  int32_t*            numOfBlocksPerTable;
91 92
} SBlockOrderSupporter;

93
typedef struct STsdbQueryHandle {
H
Haojun Liao 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  STsdbRepo*     pTsdb;
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
  SArray*        pTableCheckInfo;  //SArray<STableCheckInfo>
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
  void*          qinfo;            // query info handle, for debug purpose
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
109 110
  SFileGroup*    pFileGroup;
  SFileGroupIter fileIter;
H
hjxilinx 已提交
111
  SRWHelper      rhelper;
H
Haojun Liao 已提交
112
  STableBlockInfo* pDataBlockInfo;
113 114
  SMemTable*     mem;              // mem-table
  SMemTable*     imem;             // imem-table, acquired from snapshot
H
Haojun Liao 已提交
115 116 117
  
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
118 119
} STsdbQueryHandle;

120
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
121
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
122 123 124
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
                                   SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
H
Haojun Liao 已提交
125
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
126
                                 STsdbQueryHandle* pQueryHandle);
H
hjxilinx 已提交
127

128
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
129
  pBlockLoadInfo->slot = -1;
H
hjLiao 已提交
130
  pBlockLoadInfo->tid = -1;
H
hjxilinx 已提交
131
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
132 133
}

134
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
135
  pCompBlockLoadInfo->tid = -1;
136 137
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
138

H
TD-353  
Hongze Cheng 已提交
139
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
140
  STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
H
Haojun Liao 已提交
141 142 143 144 145 146
  pQueryHandle->order       = pCond->order;
  pQueryHandle->window      = pCond->twindow;
  pQueryHandle->pTsdb       = tsdb;
  pQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
  pQueryHandle->cur.fid     = -1;
  pQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
147
  pQueryHandle->checkFiles  = true;
H
Haojun Liao 已提交
148
  pQueryHandle->activeIndex = 0;   // current active table index
H
Haojun Liao 已提交
149
  pQueryHandle->qinfo       = qinfo;
H
Haojun Liao 已提交
150 151
  pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
  
H
TD-100  
hzcheng 已提交
152
  tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
153
  tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
154

155
  size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
156
  assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
H
hjxilinx 已提交
157

H
Haojun Liao 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  // allocate buffer in order to load data blocks from file
  int32_t numOfCols = pCond->numOfCols;
  
  pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
  pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));  // todo: use list instead of array?
  
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData  colInfo = {{0}, 0};
  
    colInfo.info = pCond->colList[i];
    colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
    taosArrayPush(pQueryHandle->pColumns, &colInfo);
    pQueryHandle->statis[i].colId = colInfo.info.colId;
  }
  
173
  pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
174 175
  STsdbMeta* pMeta = tsdbGetMeta(tsdb);
  assert(pMeta != NULL);
176 177 178
  
  for (int32_t i = 0; i < sizeOfGroup; ++i) {
    SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
H
Haojun Liao 已提交
179
    
180
    size_t gsize = taosArrayGetSize(group);
181 182
    assert(gsize > 0);
    
183
    for (int32_t j = 0; j < gsize; ++j) {
184
      STable* pTable = (STable*) taosArrayGetP(group, j);
185 186 187
      
      STableCheckInfo info = {
          .lastKey = pQueryHandle->window.skey,
188 189
          .tableId = pTable->tableId,
          .pTableObj = pTable,
190
      };
H
Haojun Liao 已提交
191
      
192
      assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
B
Bomin Zhang 已提交
193
      info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
194

195 196
      taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
    }
H
hjxilinx 已提交
197
  }
198
  
S
Shengliang Guan 已提交
199
  tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
200

201 202
  tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
203

H
hjxilinx 已提交
204 205 206
  return (TsdbQueryHandleT) pQueryHandle;
}

H
TD-353  
Hongze Cheng 已提交
207
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
208
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
H
hjxilinx 已提交
209
  
210
  pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
H
hjxilinx 已提交
211 212
  pQueryHandle->order = TSDB_ORDER_DESC;
  
213
  changeQueryHandleForLastrowQuery(pQueryHandle);
H
hjxilinx 已提交
214
  return pQueryHandle;
H
hjxilinx 已提交
215 216
}

217
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
218 219 220 221 222
  assert(pHandle != NULL);
  
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
  
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
223
  SArray* res = taosArrayInit(size, POINTER_BYTES);
224 225 226
  
  for(int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
227
    taosArrayPush(res, &pCheckInfo->pTableObj);
228 229 230 231 232
  }
  
  return res;
}

H
TD-353  
Hongze Cheng 已提交
233
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
234
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
235 236
  
  pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
237
  changeQueryHandleForInterpQuery(pQueryHandle);
238 239 240
  return pQueryHandle;
}

241
static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
242 243 244
  STable* pTable = pCheckInfo->pTableObj;
  assert(pTable != NULL);
  
245
  if (pCheckInfo->initBuf) {
246 247 248
    return true;
  }
  
249
  pCheckInfo->initBuf = true;
250
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
251

252
//  tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
H
Haojun Liao 已提交
253

254
  // no data in buffer, abort
255
  if (pHandle->mem == NULL && pHandle->imem == NULL) {
256 257 258 259 260
    return false;
  }
  
  assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
  
261 262
  if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
    pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
H
Haojun Liao 已提交
263
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
264 265
  }
  
266 267
  if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
    pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
H
Haojun Liao 已提交
268
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  }
  
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
  
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
  
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
288
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
289 290
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
291
    tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
292 293 294 295 296 297 298 299
  }
  
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
300
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
301 302
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
303
    tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
304 305 306 307 308
  }
  
  return true;
}

H
Haojun Liao 已提交
309 310 311 312 313 314 315 316
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
  SDataRow rmem = NULL, rimem = NULL;
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
      rmem = SL_GET_NODE_DATA(node);
    }
  }
317

H
Haojun Liao 已提交
318 319 320 321 322 323
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
      rimem = SL_GET_NODE_DATA(node);
    }
  }
324

H
Haojun Liao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338
  if (rmem != NULL && rimem != NULL) {
    if (dataRowKey(rmem) < dataRowKey(rimem)) {
      pCheckInfo->chosen = 0;
      return rmem;
    } else if (dataRowKey(rmem) == dataRowKey(rimem)) {
      // data ts are duplicated, ignore the data in mem
      tSkipListIterNext(pCheckInfo->iter);
      pCheckInfo->chosen = 1;
      return rimem;
    } else {
      pCheckInfo->chosen = 1;
      return rimem;
    }
  }
339

H
Haojun Liao 已提交
340 341 342 343
  if (rmem != NULL) {
    pCheckInfo->chosen = 0;
    return rmem;
  }
344

H
Haojun Liao 已提交
345 346 347 348
  if (rimem != NULL) {
    pCheckInfo->chosen = 1;
    return rimem;
  }
349

H
Haojun Liao 已提交
350 351 352 353 354 355 356 357 358
  return NULL;
}

bool moveToNextRow(STableCheckInfo* pCheckInfo) {
  bool hasNext = false;
  if (pCheckInfo->chosen == 0) {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
359

H
Haojun Liao 已提交
360 361 362
    if (hasNext) {
      return hasNext;
    }
363

H
Haojun Liao 已提交
364 365 366 367 368 369 370 371
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
  } else {
    if (pCheckInfo->chosen == 1) {
      if (pCheckInfo->iiter != NULL) {
        hasNext = tSkipListIterNext(pCheckInfo->iiter);
      }
372

H
Haojun Liao 已提交
373 374 375
      if (hasNext) {
        return hasNext;
      }
376

H
Haojun Liao 已提交
377 378 379 380 381
      if (pCheckInfo->iter != NULL) {
        return tSkipListIterGet(pCheckInfo->iter) != NULL;
      }
    }
  }
382

H
Haojun Liao 已提交
383 384 385
  return hasNext;
}

386
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
387 388
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
389 390 391
  pHandle->cur.fid = -1;
  
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
392

393
  STable* pTable = pCheckInfo->pTableObj;
394
  assert(pTable != NULL);
H
Haojun Liao 已提交
395
  
H
Haojun Liao 已提交
396 397 398
  initTableMemIterator(pHandle, pCheckInfo);
  SDataRow row = getSDataRowInTableMem(pCheckInfo);
  if (row == NULL) {
399 400
    return false;
  }
401

402
  pCheckInfo->lastKey = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
403
  tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
404
      pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
405
  
406
  // all data in mem are checked already.
407 408
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
409 410
    return false;
  }
411 412 413
  
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
414
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
415 416 417 418 419 420
                                    pHandle->outputCapacity, &win->skey, &win->ekey, pHandle);  // todo refactor API
  
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
421

422 423 424 425
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
    SWAP(win->skey, win->ekey, TSKEY);
  }
  
426
  return true;
427
}
H
hjxilinx 已提交
428

429 430
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
431 432 433 434
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
  
435
  int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision]));  // set the starting fileId
436 437 438 439 440
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
  
  if (fid > 0L && fid > INT32_MAX) {
441 442 443 444
    fid = INT32_MAX;
  }
  
  return fid;
445 446
}

H
Haojun Liao 已提交
447
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
  
  int32_t midSlot = firstSlot;
  
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
    
    if (numOfBlocks == 1) break;
    
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
  
  return midSlot;
}
473 474 475

static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
  SFileGroup* fileGroup = pQueryHandle->pFileGroup;
476
  assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
H
Haojun Liao 已提交
477 478 479 480 481 482 483

  int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);

  //open file failed, return error code to client
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
484 485 486 487 488 489 490 491

  // load all the comp offset value for all tables in this file
  *numOfBlocks = 0;
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);

  for (int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);

H
TD-100  
hzcheng 已提交
492
    SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
493 494
    if (compIndex->len == 0 || compIndex->numOfBlocks == 0 ||
        compIndex->uid != pCheckInfo->tableId.uid) {  // no data block in this file, try next file
495
      pCheckInfo->numOfBlocks = 0;
496
      continue;  // no data blocks in the file belongs to pCheckInfo->pTable
497
    } else {
H
hjxilinx 已提交
498 499 500 501 502 503 504
      if (pCheckInfo->compSize < compIndex->len) {
        assert(compIndex->len > 0);
        
        char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
        assert(t != NULL);
        
        pCheckInfo->pCompInfo = (SCompInfo*) t;
505
        pCheckInfo->compSize = compIndex->len;
H
hjxilinx 已提交
506 507
      }
      
H
hjxilinx 已提交
508
      tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
H
TD-100  
hzcheng 已提交
509 510

      tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
511 512 513 514 515
      SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
      
      TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      
516
      // discard the unqualified data block based on the query time window
H
Haojun Liao 已提交
517
      int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
518 519 520
      int32_t end = start;
      
      if (s > pCompInfo->blocks[start].keyLast) {
521 522 523
        continue;
      }

524
      // todo speedup the procedure of located end block
H
hzcheng 已提交
525
      while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
526
        end += 1;
527 528
      }

529 530
      pCheckInfo->numOfBlocks = (end - start);
      
531
      if (start > 0) {
532
        memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
533 534 535 536
      }

      (*numOfBlocks) += pCheckInfo->numOfBlocks;
    }
537
  }
538

539 540 541
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
542 543 544 545 546 547
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
                    .tid = (_checkInfo)->tableId.tid,                                  \
                    .uid = (_checkInfo)->tableId.uid})
548

549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  assert(numOfCols <= TSDB_MAX_COLUMNS);
  
  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
    taosArrayPush(pIdList, &pCol->info.colId);
  }
  
  return pIdList;
}

static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pQueryHandle);
  
  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
  
  // the primary timestamp column does not be included in the the specified load column list, add it
  if (loadTS && colId != 0) {
    int16_t columnId = 0;
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }
  
  return pLocalIdList;
}

577
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
H
hzcheng 已提交
578
  STsdbRepo *pRepo = pQueryHandle->pTsdb;
579
  SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
H
[td-32]  
hjxilinx 已提交
580

H
hjxilinx 已提交
581 582
  data->numOfCols = pBlock->numOfCols;
  data->uid = pCheckInfo->pTableObj->tableId.uid;
583 584 585

  bool    blockLoaded = false;
  SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
586

H
Haojun Liao 已提交
587
  int64_t st = taosGetTimestampUs();
588

H
Haojun Liao 已提交
589
  if (pCheckInfo->pDataCols == NULL) {
H
TD-353  
Hongze Cheng 已提交
590
    STsdbMeta* pMeta = tsdbGetMeta(pRepo);
H
TD-353  
Hongze Cheng 已提交
591
    // TODO
H
Haojun Liao 已提交
592
    pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
593 594
  }

H
TD-353  
Hongze Cheng 已提交
595
  tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
596

H
Hongze Cheng 已提交
597
  if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) {
598 599 600 601
    SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;

    pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
    pBlockLoadInfo->slot = pQueryHandle->cur.slot;
H
hjLiao 已提交
602
    pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
603 604 605 606

    blockLoaded = true;
  }

H
Haojun Liao 已提交
607 608
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
  assert(pCols->numOfRows != 0);
609

610 611
  taosArrayDestroy(sa);
  tfree(data);
612

H
Haojun Liao 已提交
613
  int64_t et = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
614
  tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
615

616
  return blockLoaded;
H
hjxilinx 已提交
617 618
}

619 620
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
621
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
622

623
  /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
624 625 626
  SDataRow row = getSDataRowInTableMem(pCheckInfo);

  TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
627 628
  cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
  
629 630
  if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
631
    
632 633 634
    if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {

H
Haojun Liao 已提交
635 636 637
      // do not load file block into buffer
      int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;

H
Haojun Liao 已提交
638
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
H
Haojun Liao 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651
                                        pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
      pQueryHandle->realNumOfRows = cur->rows;

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
      if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
        SWAP(cur->win.skey, cur->win.ekey, TSKEY);
      }
      
      cur->mixBlock = true;
      cur->blockCompleted = false;
      return;
    }
H
Haojun Liao 已提交
652 653
  
    SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
654
  
655 656
    doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
    doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
H
Haojun Liao 已提交
657 658
    taosArrayDestroy(sa);
    
659
  } else {
660 661 662 663 664 665
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
666
    assert(pQueryHandle->outputCapacity >= binfo.rows);
667
    pQueryHandle->realNumOfRows = binfo.rows;
668

669 670 671 672 673 674 675 676
    cur->rows = binfo.rows;
    cur->win  = binfo.window;
    cur->mixBlock = false;
    cur->blockCompleted = true;
    cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
  }
}

677 678 679 680
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
  SArray*        sa = getDefaultLoadColumns(pQueryHandle, true);
  SQueryFilePos* cur = &pQueryHandle->cur;

681
  if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
H
hjxilinx 已提交
682
    // query ended in current block
683
    if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
684
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
B
Bomin Zhang 已提交
685
        taosArrayDestroy(sa);
686 687
        return false;
      }
688

689
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
H
Haojun Liao 已提交
690
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
691
      
692 693
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
H
Haojun Liao 已提交
694
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
695 696 697
      } else {
        cur->pos = 0;
      }
698
      
699
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
700
    } else {  // the whole block is loaded in to buffer
701
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
702
    }
703
  } else {  //desc order, query ended in current block
704
    if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
705 706 707
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
708 709
  
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
710
      if (pCheckInfo->lastKey < pBlock->keyLast) {
711
        cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
712
      } else {
H
Haojun Liao 已提交
713
        cur->pos = pBlock->numOfRows - 1;
714 715
      }
      
716
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
717
    } else {
718
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
719
    }
720
  }
721

722
  taosArrayDestroy(sa);
H
[td-32]  
hjxilinx 已提交
723 724 725
  return pQueryHandle->realNumOfRows > 0;
}

726
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
727
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
728
  int    numOfRows;
729 730
  TSKEY* keyList;

731 732
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
  
733
  if (num <= 0) return -1;
734 735

  keyList = (TSKEY*)pValue;
736 737
  firstPos = 0;
  lastPos = num - 1;
738

739
  if (order == TSDB_ORDER_DESC) {
740 741 742 743 744
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
745

H
Haojun Liao 已提交
746 747
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
748

749 750 751 752 753 754 755 756
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
757

758 759 760 761 762
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
763

764 765 766 767 768 769 770
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
771

H
Haojun Liao 已提交
772 773
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
774

775 776 777 778 779 780 781 782 783
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
784

785 786 787
  return midPos;
}

H
Haojun Liao 已提交
788
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
789
  char* pData = NULL;
790
  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
791
  
792 793 794
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
  TSKEY* tsArray = pCols->cols[0].pData;
  
795
  int32_t num = end - start + 1;
796
  int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
797 798
  
  //data in buffer has greater timestamp, copy data in file block
799 800
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
801
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
802 803 804 805 806 807 808 809 810

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

    int32_t bytes = pColInfo->info.bytes;

811
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
812 813
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
814
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
815
    }
816 817 818 819 820 821 822 823 824 825 826 827 828

    if (pColInfo->info.colId == src->colId) {

      if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
        memmove(pData, src->pData + bytes * start, bytes * num);
      } else {  // handle the var-string
        char* dst = pData;

        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
          char* p = tdGetColDataOfRow(src, k);
          memcpy(dst, p, varDataTLen(p));
          dst += bytes;
829 830
        }
      }
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        char* dst = pData;

        for(int32_t k = start; k < num + start; ++k) {
          setVardataNull(dst, pColInfo->info.type);
          dst += bytes;
        }
      } else {
        setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
      }
      i++;
846 847
    }
  }
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865

  while (i < requiredNumOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
    }

    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
      char* dst = pData;

      for(int32_t k = start; k < num + start; ++k) {
        setVardataNull(dst, pColInfo->info.type);
        dst += pColInfo->info.bytes;
      }
    } else {
      setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
866
    }
867 868

    i++;
869 870
  }
  
871 872
  pQueryHandle->cur.win.ekey = tsArray[end];
  pQueryHandle->cur.lastKey = tsArray[end] + step;
873
  
874
  return numOfRows + num;
875 876
}

877 878
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
                              STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
879
  char* pData = NULL;
880 881

  // the schema version info is embeded in SDataRow
H
TD-353  
Hongze Cheng 已提交
882
  STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
883 884 885 886
  int32_t numOfRowCols = schemaNCols(pSchema);

  int32_t i = 0, j = 0;
  while(i < numOfCols && j < numOfRowCols) {
887
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
888 889 890 891 892
    if (pSchema->columns[j].colId < pColInfo->info.colId) {
      j++;
      continue;
    }

893
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
894 895 896 897
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913

    if (pSchema->columns[j].colId == pColInfo->info.colId) {
      void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        memcpy(pData, value, varDataTLen(value));
      } else {
        memcpy(pData, value, pColInfo->info.bytes);
      }

      j++;
      i++;
    } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        setVardataNull(pData, pColInfo->info.type);
      } else {
        setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
914
      }
915
      i++;
916
    }
917 918 919 920 921 922 923 924 925 926
  }

  while (i < numOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }

927
    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
928
      setVardataNull(pData, pColInfo->info.type);
929
    } else {
930
      setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
931
    }
932 933

    i++;
934 935 936
  }
}

H
[td-32]  
hjxilinx 已提交
937 938
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
939
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
940 941
                                  SArray* sa) {
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
942
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
943
  
944
  initTableMemIterator(pQueryHandle, pCheckInfo);
945
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
946 947 948 949 950 951 952 953 954 955

  // for search the endPos, so the order needs to reverse
  int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;

  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);

  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

956
  int32_t endPos = cur->pos;
957
  if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
958
    endPos = blockInfo.rows - 1;
959
    cur->mixBlock = (cur->pos != 0);
960
  } else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
961
    endPos = 0;
962
    cur->mixBlock = (cur->pos != blockInfo.rows - 1);
963
  } else {
964 965
    assert(pCols->numOfRows > 0);
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
966
    cur->mixBlock = true;
967
  }
H
hjxilinx 已提交
968
  
969
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
970
  int32_t pos = cur->pos;
H
hjxilinx 已提交
971
  
972 973
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
  TSKEY* tsArray = pCols->cols[0].pData;
H
hjxilinx 已提交
974
  
975 976
  int32_t numOfRows = 0;
  pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
977

978 979
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
980 981
    int32_t start = cur->pos;
    int32_t end = endPos;
982
    if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
983 984 985 986 987 988 989 990
      end = cur->pos;
      start = endPos;
    }
    
    cur->win.skey = tsArray[start];
    cur->win.ekey = tsArray[end];
    
    // todo opt in case of no data in buffer
H
Haojun Liao 已提交
991
    numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
992
    
993
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
994
    if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
995
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
996 997

      for(int32_t i = 0; i < numOfCols; ++i) {
998 999 1000 1001 1002
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  
H
Haojun Liao 已提交
1003
    pos += (end - start + 1) * step;
1004 1005
    cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
        ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
H
Haojun Liao 已提交
1006
    
1007 1008 1009
    pCheckInfo->lastKey = cur->lastKey;
    pQueryHandle->realNumOfRows = numOfRows;
    cur->rows = numOfRows;
1010
    return;
1011
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1012 1013
    SSkipListNode* node = NULL;
    do {
1014 1015
      SDataRow row = getSDataRowInTableMem(pCheckInfo);
      if (row == NULL) {
H
[td-32]  
hjxilinx 已提交
1016
        break;
1017
      }
1018

1019
      TSKEY key = dataRowKey(row);
1020 1021
      if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1022 1023 1024
        break;
      }

1025 1026
      if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1027 1028 1029
        break;
      }

1030 1031
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1032
        copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
1033 1034 1035 1036
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1037

1038
        cur->win.ekey = key;
1039 1040 1041
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1042
        moveToNextRow(pCheckInfo);
1043
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
1044
        moveToNextRow(pCheckInfo);
1045 1046
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1047 1048 1049
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1050

1051
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
H
Haojun Liao 已提交
1052
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
H
Haojun Liao 已提交
1053
          moveToNextRow(pCheckInfo);
H
Haojun Liao 已提交
1054 1055
        }
        
1056
        int32_t start = -1;
1057
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
          int32_t remain = end - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
        } else {
          int32_t remain = (pos - end) + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = end;
          end = pos;
        }

H
Haojun Liao 已提交
1074
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1075 1076 1077 1078 1079
        pos += (end - start + 1) * step;
      }
    } while (numOfRows < pQueryHandle->outputCapacity);
    
    if (numOfRows < pQueryHandle->outputCapacity) {
H
Haojun Liao 已提交
1080 1081 1082 1083
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1084
      if (node == NULL ||
1085 1086
          ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1087 1088 1089 1090 1091 1092 1093 1094 1095
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

        int32_t start = -1;
        int32_t end = -1;

        // all remain data are qualified, but check the remain capacity in the first place.
1096
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
          int32_t remain = endPos - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
          end = endPos;
        } else {
          int32_t remain = pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = endPos;
          end = pos;
        }

H
Haojun Liao 已提交
1114
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1115
        pos += (end - start + 1) * step;
1116
      }
1117 1118
    }
  }
1119
  
1120 1121
  cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
      ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
1122

1123
  if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
1124 1125 1126 1127 1128
    SWAP(cur->win.skey, cur->win.ekey, TSKEY);
  
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
    if (numOfRows < pQueryHandle->outputCapacity) {
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
1129
      for(int32_t i = 0; i < numOfCols; ++i) {
1130 1131 1132 1133 1134 1135
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  }
  
1136 1137 1138 1139
  pCheckInfo->lastKey = cur->lastKey;
  pQueryHandle->realNumOfRows = numOfRows;
  cur->rows = numOfRows;
  cur->pos = pos;
1140

S
Shengliang Guan 已提交
1141
  tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
1142
      cur->win.ekey, cur->rows, pQueryHandle->qinfo);
1143 1144
}

1145
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
1146
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1147
  int    numOfRows;
1148 1149
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
1150
  if (num <= 0) return -1;
1151 1152

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
1153 1154
  firstPos = 0;
  lastPos = num - 1;
1155

1156
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
1157 1158 1159 1160 1161
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1162

H
Haojun Liao 已提交
1163 1164
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1165

H
[td-32]  
hjxilinx 已提交
1166 1167 1168 1169 1170 1171 1172 1173
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1174

H
[td-32]  
hjxilinx 已提交
1175 1176 1177 1178 1179
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1180

H
[td-32]  
hjxilinx 已提交
1181 1182 1183 1184 1185 1186 1187
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1188

H
Haojun Liao 已提交
1189 1190
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1191

H
[td-32]  
hjxilinx 已提交
1192 1193 1194 1195 1196 1197 1198 1199 1200
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1201

H
[td-32]  
hjxilinx 已提交
1202 1203 1204
  return midPos;
}

1205
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
1206
  tfree(pSupporter->numOfBlocksPerTable);
1207 1208 1209
  tfree(pSupporter->blockIndexArray);

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
1210 1211
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
    tfree(pBlockInfo);
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
  }

  tfree(pSupporter->pDataBlockInfo);
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

1226
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
1227 1228
    /* left block is empty */
    return 1;
1229
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
1230 1231 1232 1233 1234 1235 1236
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
1237 1238 1239
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
1240
    // todo add more information
B
Bomin Zhang 已提交
1241
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
1242 1243
  }

H
Haojun Liao 已提交
1244
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
1245 1246
}

1247
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
1248 1249
  char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks);
  if (tmp == NULL) {
1250
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1251 1252
  }

H
Haojun Liao 已提交
1253
  pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
1254 1255 1256 1257
  memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks);
  *numOfAllocBlocks = numOfBlocks;

  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1258

1259 1260
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
1261
  sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
1262 1263 1264
  sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);

1265
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
1266
    cleanBlockOrderSupporter(&sup, 0);
1267
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1268
  }
H
Haojun Liao 已提交
1269
  
1270
  int32_t cnt = 0;
1271
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
1272
  
1273 1274
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
1275 1276 1277 1278
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
    
1279
    SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
1280
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
1281 1282 1283

    char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
    if (buf == NULL) {
1284
      cleanBlockOrderSupporter(&sup, numOfQualTables);
1285
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
1286 1287
    }

1288
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
1289 1290

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
1291
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
1292

H
Haojun Liao 已提交
1293 1294
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
1295 1296 1297
      cnt++;
    }

1298
    numOfQualTables++;
1299 1300
  }

H
Haojun Liao 已提交
1301
  assert(numOfBlocks == cnt);
1302

H
Haojun Liao 已提交
1303 1304 1305 1306
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
    memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
    cleanBlockOrderSupporter(&sup, numOfQualTables);
1307

S
Shengliang Guan 已提交
1308
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
H
Haojun Liao 已提交
1309 1310 1311
        pQueryHandle->qinfo);
    return TSDB_CODE_SUCCESS;
  }
1312

S
Shengliang Guan 已提交
1313
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
H
Haojun Liao 已提交
1314
      numOfQualTables, pQueryHandle->qinfo);
1315

1316
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
1317
  sup.numOfTables = numOfQualTables;
1318

H
Haojun Liao 已提交
1319
  SLoserTreeInfo* pTree = NULL;
1320 1321 1322
  uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
1323
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1324 1325 1326 1327 1328 1329 1330 1331
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
    int32_t pos = pTree->pNode[0].index;
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
1332 1333
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
    pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
1334 1335

    // set data block index overflow, in order to disable the offset comparator
1336 1337
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
1338
    }
1339 1340 1341 1342 1343 1344 1345

    tLoserTreeAdjust(pTree, pos + sup.numOfTables);
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
1346
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
1347 1348 1349
   * }
   */

S
Shengliang Guan 已提交
1350
  tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt);
1351 1352 1353 1354 1355 1356
  cleanBlockOrderSupporter(&sup, numOfTables);
  free(pTree);

  return TSDB_CODE_SUCCESS;
}

1357
// todo opt for only one table case
H
Haojun Liao 已提交
1358
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
1359 1360
  pQueryHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
1361 1362 1363

  int32_t code = TSDB_CODE_SUCCESS;

1364 1365 1366 1367
  int32_t numOfBlocks = 0;
  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
1368
    int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
H
Haojun Liao 已提交
1369
    if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
1370 1371 1372
      break;
    }
    
S
Shengliang Guan 已提交
1373
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
H
Haojun Liao 已提交
1374
           numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
1375
    
1376 1377 1378 1379 1380
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
    
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
    // todo return error code to query engine
    if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
      break;
    }
    
    assert(numOfBlocks >= pQueryHandle->numOfBlocks);
    if (pQueryHandle->numOfBlocks > 0) {
      break;
    }
  }
  
  // no data in file anymore
  if (pQueryHandle->numOfBlocks <= 0) {
H
Haojun Liao 已提交
1394 1395 1396 1397
    if (code == TSDB_CODE_SUCCESS) {
      assert(pQueryHandle->pFileGroup == NULL);
    }

1398
    cur->fid = -1;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
1399 1400
    *exists = false;
    return code;
1401 1402
  }
  
1403
  cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
1404 1405 1406
  cur->fid = pQueryHandle->pFileGroup->fileId;
  
  STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1407 1408 1409
  *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);

  return TSDB_CODE_SUCCESS;
1410 1411
}

H
Haojun Liao 已提交
1412
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
1413 1414 1415 1416 1417 1418
  STsdbFileH*    pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
  SQueryFilePos* cur = &pQueryHandle->cur;

  // find the start data block in file
  if (!pQueryHandle->locateStart) {
    pQueryHandle->locateStart = true;
1419 1420
    STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
    int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
1421 1422
    
    tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
1423 1424
    tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);

H
Haojun Liao 已提交
1425
    return getDataBlocksInFilesImpl(pQueryHandle, exists);
1426
  } else {
1427 1428 1429 1430 1431
    // check if current file block is all consumed
    STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
    
    // current block is done, try next
1432
    if (!cur->mixBlock || cur->blockCompleted) {
1433 1434
      if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1435
        // all data blocks in current file has been checked already, try next file if exists
H
Haojun Liao 已提交
1436
        return getDataBlocksInFilesImpl(pQueryHandle, exists);
H
Haojun Liao 已提交
1437 1438
      } else {
        // next block of the same file
1439
        int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
1440
        cur->slot += step;
1441 1442 1443 1444
        
        cur->mixBlock = false;
        cur->blockCompleted = false;
        
1445
        STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1446 1447 1448
        *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);

        return TSDB_CODE_SUCCESS;
1449
      }
1450
    } else {
H
Haojun Liao 已提交
1451
      handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
H
Haojun Liao 已提交
1452 1453 1454
      *exists = pQueryHandle->realNumOfRows > 0;

      return TSDB_CODE_SUCCESS;
1455 1456
    }
  }
1457 1458
}

1459 1460
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1461
  assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
1462
  
1463 1464
  while (pQueryHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pQueryHandle)) {
1465 1466 1467
      return true;
    }
    
1468 1469 1470 1471 1472 1473 1474
    pQueryHandle->activeIndex += 1;
  }
  
  return false;
}

// handle data in cache situation
1475 1476
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
1477 1478 1479 1480
  
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  assert(numOfTables > 0);
  
1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
  if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
    pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
    pQueryHandle->order = TSDB_ORDER_DESC;
    
    if (!tsdbNextDataBlock(pHandle)) {
      return false;
    }
    
    SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
    /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle);
    /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa);
  
    if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
      // data already retrieve, discard other data rows and return
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
      }
  
      pQueryHandle->cur.win  = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 1;
      pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
B
Bomin Zhang 已提交
1505
      taosArrayDestroy(sa);
1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
      return true;
    } else {
      STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
      pSecQueryHandle->order       = TSDB_ORDER_ASC;
      pSecQueryHandle->window      = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
      pSecQueryHandle->pTsdb       = pQueryHandle->pTsdb;
      pSecQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
      pSecQueryHandle->cur.fid     = -1;
      pSecQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
      pSecQueryHandle->checkFiles  = true;
      pSecQueryHandle->activeIndex = 0;
      pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
  
      tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
H
Haojun Liao 已提交
1520 1521
      tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);

1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
      // allocate buffer in order to load data blocks from file
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  
      pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
      pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData colInfo = {{0}, 0};
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        
        colInfo.info = pCol->info;
        colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
        taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
        pSecQueryHandle->statis[i].colId = colInfo.info.colId;
      }
  
      size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
      pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
      STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
      assert(pMeta != NULL);
  
      for (int32_t j = 0; j < si; ++j) {
        STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
    
        STableCheckInfo info = {
            .lastKey = pSecQueryHandle->window.skey,
            .tableId = pCheckInfo->tableId,
            .pTableObj = pCheckInfo->pTableObj,
        };
        
        taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
      }
  
      tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
      tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
  
      bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
      assert(ret);

      /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle);
      /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa);
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
  
        SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
        assert(pCol->info.colId == pCol1->info.colId);
        
        memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
      }
  
      SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
      
      pQueryHandle->cur.win  = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 2;
      
      tsdbCleanupQueryHandle(pSecQueryHandle);
    }
    
    pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
    return true;
  }
  
1587
  if (pQueryHandle->checkFiles) {
H
Haojun Liao 已提交
1588 1589 1590 1591 1592 1593 1594 1595
    bool exists = true;
    int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }

    if (exists) {
      return exists;
1596
    }
1597 1598 1599 1600 1601
  
    pQueryHandle->activeIndex = 0;
    pQueryHandle->checkFiles  = false;
  }
  
H
Haojun Liao 已提交
1602 1603
  // TODO: opt by consider the scan order
  return doHasDataInBuffer(pQueryHandle);
1604 1605
}

1606
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
H
hjxilinx 已提交
1607
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
1608
  assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
H
hjxilinx 已提交
1609 1610 1611 1612 1613 1614
  
  // starts from the buffer in case of descending timestamp order check data blocks
  
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
1615
  TSKEY key = TSKEY_INITIAL_VAL;
H
hjxilinx 已提交
1616 1617 1618 1619
  int32_t index = -1;
  
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
1620
    if (pCheckInfo->pTableObj->lastKey > key) {
H
hjxilinx 已提交
1621 1622 1623 1624 1625
      key = pCheckInfo->pTableObj->lastKey;
      index = i;
    }
  }
  
1626
  if (index == -1) {
1627
    // todo add failure test cases
1628 1629 1630
    return;
  }
  
1631
  // erase all other elements in array list
H
hjxilinx 已提交
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    if (i == index) {
      continue;
    }
    
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    tSkipListDestroyIter(pTableCheckInfo->iter);
    
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
    
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
1650
  taosArrayClear(pQueryHandle->pTableCheckInfo);
H
Haojun Liao 已提交
1651 1652
  
  info.lastKey = key;
H
hjxilinx 已提交
1653 1654 1655 1656 1657 1658
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {key, key};
}

1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
  // filter the queried time stamp in the first place
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
  pQueryHandle->order = TSDB_ORDER_DESC;
  
  assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
  
  // starts from the buffer in case of descending timestamp order check data blocks
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  int32_t i = 0;
  while(i < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
        pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
      break;
    }
    
    i++;
  }
  
  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
  taosArrayClear(pQueryHandle->pTableCheckInfo);
  
  info.lastKey = pQueryHandle->window.skey;
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}

H
Haojun Liao 已提交
1696
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
1697
                                 STsdbQueryHandle* pQueryHandle) {
1698
  int     numOfRows = 0;
1699
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
1700
  *skey = TSKEY_INITIAL_VAL;
1701

H
Haojun Liao 已提交
1702
  int64_t st = taosGetTimestampUs();
1703 1704 1705
  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

1706
  do {
H
Haojun Liao 已提交
1707 1708
    SDataRow row = getSDataRowInTableMem(pCheckInfo);
    if (row == NULL) {
1709 1710
      break;
    }
1711

1712
    TSKEY key = dataRowKey(row);
1713
    if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
S
Shengliang Guan 已提交
1714
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
1715 1716 1717 1718
          pQueryHandle->window.ekey);
      
      break;
    }
1719

1720
    if (*skey == INT64_MIN) {
H
Haojun Liao 已提交
1721
      *skey = key;
1722
    }
1723

H
Haojun Liao 已提交
1724
    *ekey = key;
1725 1726
    copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);

1727
    if (++numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
1728
      moveToNextRow(pCheckInfo);
1729 1730 1731
      break;
    }
    
H
Haojun Liao 已提交
1732
  } while(moveToNextRow(pCheckInfo));
1733

1734 1735 1736
  assert(numOfRows <= maxRowsToRead);
  
  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1737
  if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
1738 1739 1740 1741 1742 1743 1744 1745
    int32_t emptySize = maxRowsToRead - numOfRows;
    
    for(int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
      memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }
  
H
Haojun Liao 已提交
1746
  int64_t elapsedTime = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
1747
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
H
Haojun Liao 已提交
1748
            elapsedTime, numOfRows, numOfCols);
1749

1750
  return numOfRows;
H
hjxilinx 已提交
1751 1752
}

H
hzcheng 已提交
1753
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
1754
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
1755
  SQueryFilePos* cur = &pHandle->cur;
1756
  STable* pTable = NULL;
1757
  
1758
  // there are data in file
1759
  if (pHandle->cur.fid >= 0) {
1760 1761
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
    pTable = pBlockInfo->pTableCheckInfo->pTableObj;
H
[td-32]  
hjxilinx 已提交
1762
  } else {
1763
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
1764
    pTable = pCheckInfo->pTableObj;
1765
  }
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
  
  SDataBlockInfo blockInfo = {
      .uid = pTable->tableId.uid,
      .tid = pTable->tableId.tid,
      .rows = cur->rows,
      .window = cur->win,
      .numOfCols = QH_GET_NUM_OF_COLS(pHandle),
  };

  return blockInfo;
1776
}
H
hjxilinx 已提交
1777

H
Haojun Liao 已提交
1778 1779 1780
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
H
hzcheng 已提交
1781
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
H
Haojun Liao 已提交
1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
  
  SQueryFilePos* cur = &pHandle->cur;
  if (cur->mixBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
  assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
      ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
  
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1794 1795 1796 1797 1798 1799 1800
  
  // file block with subblocks has no statistics data
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
H
Haojun Liao 已提交
1801 1802
  tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
  
H
Haojun Liao 已提交
1803
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
1804 1805 1806 1807 1808 1809 1810 1811
  for(int32_t i = 0; i < numOfCols; ++i) {
    SDataStatis* st = &pHandle->statis[i];
    int32_t colId = st->colId;
    
    memset(st, 0, sizeof(SDataStatis));
    st->colId = colId;
  }
  
H
Haojun Liao 已提交
1812 1813
  tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
  
H
Haojun Liao 已提交
1814 1815
  *pBlockStatis = pHandle->statis;
  
H
Haojun Liao 已提交
1816 1817
  //update the number of NULL data rows
  for(int32_t i = 0; i < numOfCols; ++i) {
1818
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
1819 1820 1821 1822
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
  }
  
1823
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1824 1825
}

H
hzcheng 已提交
1826
SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
1827
  /**
H
hjxilinx 已提交
1828
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
1829 1830
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
1831 1832
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;

H
[td-32]  
hjxilinx 已提交
1833 1834 1835
  if (pHandle->cur.fid < 0) {
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
1836 1837
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
1838

1839
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
1840 1841
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
1842
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
1843 1844
      assert(pHandle->realNumOfRows <= binfo.rows);
  
H
hjxilinx 已提交
1845 1846
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
hjLiao 已提交
1847 1848 1849
      
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
          pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
H
hjxilinx 已提交
1850
        return pHandle->pColumns;
H
Haojun Liao 已提交
1851
      } else {  // only load the file block
H
Haojun Liao 已提交
1852
        SCompBlock* pBlock = pBlockInfo->compBlock;
1853
        doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
H
Haojun Liao 已提交
1854 1855
  
        // todo refactor
H
Haojun Liao 已提交
1856
        int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
1857 1858
  
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1859
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
1860 1861 1862 1863 1864 1865 1866 1867 1868
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
          int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);
    
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
            memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
          }
        }
        
H
hjxilinx 已提交
1869 1870
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
1871 1872
    }
  }
H
hjxilinx 已提交
1873 1874
}

H
hzcheng 已提交
1875
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
1876

1877
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
H
hjxilinx 已提交
1878
  SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
1879
  while (tSkipListIterNext(iter)) {
H
hjxilinx 已提交
1880
    SSkipListNode* pNode = tSkipListIterGet(iter);
1881
    
H
Haojun Liao 已提交
1882
    STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
1883
    taosArrayPush(list, pTable);
1884
  }
1885
  
1886
  tSkipListDestroyIter(iter);
1887
  return TSDB_CODE_SUCCESS;
1888 1889
}

1890
static void destroyHelper(void* param) {
1891 1892 1893
  if (param == NULL) {
    return;
  }
1894

H
hjxilinx 已提交
1895
  
1896
  tQueryInfo* pInfo = (tQueryInfo*)param;
H
hjxilinx 已提交
1897 1898 1899 1900 1901
  if (pInfo->optr != TSDB_RELATION_IN) {
    tfree(pInfo->q);
  }
  
//  tVariantDestroy(&(pInfo->q));
1902 1903 1904
  free(param);
}

H
hjxilinx 已提交
1905
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
1906 1907
  // filter on table name(TBNAME)
  if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
H
hjxilinx 已提交
1908
    return TSDB_TBNAME_COLUMN_INDEX;
1909
  }
H
hjxilinx 已提交
1910 1911 1912 1913
  
  for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
    STColumn* pColumn = &pTSchema->columns[i];
    if (pColumn->bytes == pSchema->bytes && pColumn->type  == pSchema->type  && pColumn->colId == pSchema->colId) {
H
hjxilinx 已提交
1914
      return i;
1915 1916
    }
  }
H
hjxilinx 已提交
1917 1918
  
  return -2;
1919 1920 1921
}

void filterPrepare(void* expr, void* param) {
1922
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
1923
  if (pExpr->_node.info != NULL) {
1924 1925
    return;
  }
1926

H
hjxilinx 已提交
1927
  int32_t i = 0;
H
[td-32]  
hjxilinx 已提交
1928
  pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
H
hjxilinx 已提交
1929 1930
  
  STSchema* pTSSchema = (STSchema*) param;
1931

H
hjxilinx 已提交
1932 1933 1934
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
1935

1936
  // todo : if current super table does not change schema yet, this function may fail to get correct schema, test case
H
hjxilinx 已提交
1937
  int32_t index = getTagColumnIndex(pTSSchema, pSchema);
H
hjxilinx 已提交
1938
  assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX));
1939

1940
  pInfo->sch      = *pSchema;
H
hjxilinx 已提交
1941
  pInfo->colIndex = index;
1942
  pInfo->optr     = pExpr->_node.optr;
H
hjxilinx 已提交
1943
  pInfo->compare  = getComparFunc(pSchema->type, pInfo->optr);
H
hjxilinx 已提交
1944
  pInfo->param    = pTSSchema;
H
hjxilinx 已提交
1945 1946 1947 1948 1949
  
  if (pInfo->optr == TSDB_RELATION_IN) {
    pInfo->q = (char*) pCond->arr;
  } else {
    pInfo->q = calloc(1, pSchema->bytes);
1950
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
1951
  }
1952 1953
}

1954 1955 1956 1957
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
  STSchema*  pTagSchema;
1958
//  void*      tsdbMeta;
1959 1960 1961 1962
} STableGroupSupporter;

int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
1963 1964
  STable* pTable1 = *(STable**) p1;
  STable* pTable2 = *(STable**) p2;
1965 1966 1967 1968 1969
  
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
    
H
Haojun Liao 已提交
1970
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
1971
    
1972 1973 1974 1975 1976
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
    
1977
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {  // todo refactor extract method , to queryExecutor to generate tags values
H
hjxilinx 已提交
1978 1979
      f1 = (char*) pTable1->name;
      f2 = (char*) pTable2->name;
1980
      type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
1981
      bytes = tGetTableNameColumnSchema().bytes;
1982
    } else {
H
hjxilinx 已提交
1983 1984
      STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
      bytes = pCol->bytes;
1985
      type = pCol->type;
H
Hongze Cheng 已提交
1986 1987
      f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
      f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
1988
    }
H
Haojun Liao 已提交
1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  
  return 0;
}

2014
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
H
hjxilinx 已提交
2015
    __ext_compar_fn_t compareFn) {
2016
  STable* pTable = taosArrayGetP(pTableList, 0);
2017
  
2018 2019 2020 2021
  SArray* g = taosArrayInit(16, POINTER_BYTES);
  taosArrayPush(g, &pTable);
  tsdbRefTable(pTable);

2022
  for (int32_t i = 1; i < numOfTables; ++i) {
2023 2024
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
hjxilinx 已提交
2025 2026
    
    int32_t ret = compareFn(prev, p, pSupp);
2027 2028
    assert(ret == 0 || ret == -1);
    
2029 2030 2031
    tsdbRefTable(*p);
    assert((*p)->type == TSDB_CHILD_TABLE);

2032
    if (ret == 0) {
H
hjxilinx 已提交
2033
      taosArrayPush(g, p);
2034 2035
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
2036
      g = taosArrayInit(16, POINTER_BYTES);
H
hjxilinx 已提交
2037
      taosArrayPush(g, p);
2038 2039
    }
  }
2040 2041
  
  taosArrayPush(pGroups, &g);
2042 2043
}

2044
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
2045
  assert(pTableList != NULL);
2046 2047 2048 2049
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
  
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
2050
    tsdbDebug("no qualified tables");
2051 2052 2053 2054
    return pTableGroup;
  }
  
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
2055
    SArray* sa = taosArrayInit(size, POINTER_BYTES);
weixin_48148422's avatar
weixin_48148422 已提交
2056
    for(int32_t i = 0; i < size; ++i) {
2057 2058 2059 2060 2061
      STable** pTable = taosArrayGet(pTableList, i);
      assert((*pTable)->type == TSDB_CHILD_TABLE);

      tsdbRefTable(*pTable);
      taosArrayPush(sa, pTable);
2062 2063 2064
    }
    
    taosArrayPush(pTableGroup, &sa);
S
Shengliang Guan 已提交
2065
    tsdbDebug("all %zu tables belong to one group", size);
2066 2067 2068 2069 2070 2071
  } else {
    STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
    pSupp->numOfCols = numOfOrderCols;
    pSupp->pTagSchema = pTagSchema;
    pSupp->pCols = pCols;
    
2072
    taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
H
hjxilinx 已提交
2073
    createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
2074 2075 2076 2077 2078 2079
    tfree(pSupp);
  }
  
  return pTableGroup;
}

2080
bool indexedNodeFilterFp(const void* pNode, void* param) {
H
hjxilinx 已提交
2081
  tQueryInfo* pInfo = (tQueryInfo*) param;
H
hjxilinx 已提交
2082
  
H
Haojun Liao 已提交
2083
  STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
2084

2085
  char*  val = NULL;
H
hjxilinx 已提交
2086

2087
  if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
H
Haojun Liao 已提交
2088
    val = (char*) pTable->name;
2089
  } else {
H
Haojun Liao 已提交
2090
    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
2091
  }
2092
  
T
Tao Liu 已提交
2093
  //todo :the val is possible to be null, so check it out carefully
B
Bomin Zhang 已提交
2094
  int32_t ret = pInfo->compare(val, pInfo->q);
2095

2096 2097 2098 2099 2100 2101 2102
  switch (pInfo->optr) {
    case TSDB_RELATION_EQUAL: {
      return ret == 0;
    }
    case TSDB_RELATION_NOT_EQUAL: {
      return ret != 0;
    }
2103
    case TSDB_RELATION_GREATER_EQUAL: {
2104 2105
      return ret >= 0;
    }
2106
    case TSDB_RELATION_GREATER: {
2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117
      return ret > 0;
    }
    case TSDB_RELATION_LESS_EQUAL: {
      return ret <= 0;
    }
    case TSDB_RELATION_LESS: {
      return ret < 0;
    }
    case TSDB_RELATION_LIKE: {
      return ret == 0;
    }
weixin_48148422's avatar
weixin_48148422 已提交
2118
    case TSDB_RELATION_IN: {
2119
      return ret == 1;
weixin_48148422's avatar
weixin_48148422 已提交
2120
    }
2121

2122 2123 2124
    default:
      assert(false);
  }
H
hjxilinx 已提交
2125
  
2126 2127 2128
  return true;
}

2129
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
H
hjxilinx 已提交
2130 2131
  // query according to the expression tree
  SExprTraverseSupp supp = {
2132
      .nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
H
hjxilinx 已提交
2133 2134
      .setupInfoFn = filterPrepare,
      .pExtInfo = pSTable->tagSchema,
2135
      };
2136

2137
  tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
2138
  tExprTreeDestroy(&pExpr, destroyHelper);
2139 2140 2141
  return TSDB_CODE_SUCCESS;
}

H
TD-353  
Hongze Cheng 已提交
2142
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
2143 2144
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
                                 SColIndex* pColIndex, int32_t numOfCols) {
2145 2146
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

H
hjxilinx 已提交
2147 2148
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2149
    tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
2150 2151 2152 2153
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    tsdbUnlockRepoMeta(tsdb);

    goto _error;
2154 2155
  }
  
H
hjxilinx 已提交
2156
  if (pTable->type != TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2157
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
B
Bomin Zhang 已提交
2158
        pTable->name->data);
2159 2160 2161 2162
    terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client

    tsdbUnlockRepoMeta(tsdb);
    goto _error;
H
hjxilinx 已提交
2163
  }
2164 2165 2166

  //NOTE: not add ref count for super table
  SArray* res = taosArrayInit(8, POINTER_BYTES);
H
TD-353  
Hongze Cheng 已提交
2167
  STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
2168
  
weixin_48148422's avatar
weixin_48148422 已提交
2169 2170
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
2171 2172 2173 2174
    int32_t ret = getAllTableList(pTable, res);
    if (ret != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepoMeta(tsdb);
      goto _error;
2175
    }
2176 2177 2178 2179

    pGroupInfo->numOfTables = taosArrayGetSize(res);
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
      
S
Shengliang Guan 已提交
2180
    tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
2181
    taosArrayDestroy(res);
2182 2183

    if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2184 2185
    return ret;
  }
2186

H
hjxilinx 已提交
2187
  int32_t ret = TSDB_CODE_SUCCESS;
2188
  tExprNode* expr = NULL;
2189

2190 2191
  TRY(32) {
    expr = exprTreeFromTableName(tbnameCond);
weixin_48148422's avatar
weixin_48148422 已提交
2192
    if (expr == NULL) {
2193
      expr = exprTreeFromBinary(pTagCond, len);
weixin_48148422's avatar
weixin_48148422 已提交
2194
    } else {
2195 2196 2197 2198 2199 2200 2201
      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL);
      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
      if (tagExpr != NULL) {
        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL);
        tExprNode* tbnameExpr = expr;
        expr = calloc(1, sizeof(tExprNode));
        if (expr == NULL) {
2202
          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
2203 2204 2205 2206 2207 2208
        }
        expr->nodeType = TSQL_NODE_EXPR;
        expr->_node.optr = tagNameRelType;
        expr->_node.pLeft = tagExpr;
        expr->_node.pRight = tbnameExpr;
      }
weixin_48148422's avatar
weixin_48148422 已提交
2209
    }
2210 2211 2212 2213 2214 2215 2216
    CLEANUP_EXECUTE();

  } CATCH( code ) {
    CLEANUP_EXECUTE();
    ret = code;
    // TODO: more error handling
  } END_TRY
2217

H
hjxilinx 已提交
2218
  doQueryTableList(pTable, res, expr);
2219
  pGroupInfo->numOfTables = taosArrayGetSize(res);
2220
  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
2221

S
Shengliang Guan 已提交
2222
  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
H
Haojun Liao 已提交
2223 2224
      pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));

2225
  taosArrayDestroy(res);
2226 2227

  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2228
  return ret;
2229 2230 2231

  _error:
  return terrno;
2232
}
2233

H
TD-353  
Hongze Cheng 已提交
2234
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
2235 2236
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

2237 2238
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2239
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
H
Hongze Cheng 已提交
2240
    tsdbUnlockRepoMeta(tsdb);
2241
    goto _error;
2242
  }
2243

B
Bomin Zhang 已提交
2244
  assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
2245 2246 2247
  tsdbRefTable(pTable);
  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;

2248 2249
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
2250
  
2251
  SArray* group = taosArrayInit(1, POINTER_BYTES);
2252
  
2253
  taosArrayPush(group, &pTable);
2254
  taosArrayPush(pGroupInfo->pGroupList, &group);
2255 2256
  
  return TSDB_CODE_SUCCESS;
2257 2258 2259

  _error:
  return terrno;
2260
}
2261

2262
int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
2263 2264 2265
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
  SArray* group = taosArrayInit(1, POINTER_BYTES);

  int32_t i = 0;
  for(; i < size; ++i) {
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
    }

    tsdbRefTable(pTable);
    taosArrayPush(group, &pTable);
  }

B
Bomin Zhang 已提交
2291 2292 2293 2294
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
2295 2296 2297 2298 2299 2300 2301

  pGroupInfo->numOfTables = i;
  taosArrayPush(pGroupInfo->pGroupList, &group);

  return TSDB_CODE_SUCCESS;
}

H
hzcheng 已提交
2302
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
2303
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
2304 2305 2306 2307
  if (pQueryHandle == NULL) {
    return;
  }
  
2308
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
2309
  for (int32_t i = 0; i < size; ++i) {
2310
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
2311
    tSkipListDestroyIter(pTableCheckInfo->iter);
H
Haojun Liao 已提交
2312

H
hjxilinx 已提交
2313 2314 2315
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
2316

2317 2318 2319
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
2320

2321
  taosArrayDestroy(pQueryHandle->pTableCheckInfo);
2322

H
hjxilinx 已提交
2323 2324 2325
   size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
   for (int32_t i = 0; i < cols; ++i) {
     SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
2326
     tfree(pColInfo->pData);
H
hjxilinx 已提交
2327
   }
2328

2329
  taosArrayDestroy(pQueryHandle->pColumns);
2330
  tfree(pQueryHandle->pDataBlockInfo);
H
Haojun Liao 已提交
2331
  tfree(pQueryHandle->statis);
2332 2333 2334 2335 2336

  // todo check error
  tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
  tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);

H
TD-100  
hzcheng 已提交
2337
  tsdbDestroyHelper(&pQueryHandle->rhelper);
2338 2339
  tfree(pQueryHandle);
}
2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362

void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) {
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
      assert(pTable != NULL);

      tsdbUnRefTable(pTable);
    }

    taosArrayDestroy(p);
  }

  taosArrayDestroy(pGroupList->pGroupList);
}