tsdbRead.c 79.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
S
slguan 已提交
17
#include "tulog.h"
18
#include "talgo.h"
19
#include "tutil.h"
H
Haojun Liao 已提交
20
#include "ttime.h"
21
#include "tcompare.h"
22
#include "exception.h"
23

24 25
#include "../../../query/inc/qast.h"  // todo move to common module
#include "../../../query/inc/tlosertree.h"  // todo move to util module
26
#include "tsdb.h"
H
TD-34  
hzcheng 已提交
27
#include "tsdbMain.h"
28

29
#define EXTRA_BYTES 2
30
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
31
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
32

33 34 35 36 37
enum {
  QUERY_RANGE_LESS_EQUAL = 0,
  QUERY_RANGE_GREATER_EQUAL = 1,
};

H
hjxilinx 已提交
38
enum {
39 40 41
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
  TSDB_QUERY_TYPE_EXTERNAL = 3,
H
hjxilinx 已提交
42 43
};

44 45
typedef struct SQueryFilePos {
  int32_t fid;
46 47
  int32_t slot;
  int32_t pos;
48
  int64_t lastKey;
49 50
  int32_t rows;
  bool    mixBlock;
51
  bool    blockCompleted;
52
  STimeWindow win;
53
} SQueryFilePos;
H
hjxilinx 已提交
54

55
typedef struct SDataBlockLoadInfo {
H
hjxilinx 已提交
56
  SFileGroup* fileGroup;
57
  int32_t     slot;
H
hjLiao 已提交
58
  int32_t     tid;
59
  SArray*     pLoadedCols;
60
} SDataBlockLoadInfo;
H
hjxilinx 已提交
61

62
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
63
  int32_t tid; /* table tid */
64 65
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
66

67
typedef struct STableCheckInfo {
H
Haojun Liao 已提交
68 69 70 71 72 73 74
  STableId      tableId;
  TSKEY         lastKey;
  STable*       pTableObj;
  SCompInfo*    pCompInfo;
  int32_t       compSize;
  int32_t       numOfBlocks;    // number of qualified data blocks not the original blocks
  SDataCols*    pDataCols;
H
Haojun Liao 已提交
75
  int32_t       chosen;         // indicate which iterator should move forward
H
Haojun Liao 已提交
76
  bool          initBuf;        // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
77 78 79
  SMemTable*    mem;            // in-mem buffer, hold the ref count
  SMemTable*    imem;           // imem buffer, hold the ref count to avoid release

H
Haojun Liao 已提交
80 81
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
82
} STableCheckInfo;
83

84
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
85 86
  SCompBlock*        compBlock;
  STableCheckInfo*   pTableCheckInfo;
87
} STableBlockInfo;
88

89 90
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
91
  STableBlockInfo**   pDataBlockInfo;
92
  int32_t*            blockIndexArray;
93
  int32_t*            numOfBlocksPerTable;
94 95
} SBlockOrderSupporter;

96
typedef struct STsdbQueryHandle {
H
Haojun Liao 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  STsdbRepo*     pTsdb;
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
  SArray*        pTableCheckInfo;  //SArray<STableCheckInfo>
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
  void*          qinfo;            // query info handle, for debug purpose
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
112 113
  SFileGroup*    pFileGroup;
  SFileGroupIter fileIter;
H
hjxilinx 已提交
114
  SRWHelper      rhelper;
H
Haojun Liao 已提交
115 116 117 118
  STableBlockInfo* pDataBlockInfo;
  
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
119 120
} STsdbQueryHandle;

121
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
122
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
123 124 125
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
                                   SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
H
Haojun Liao 已提交
126
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
127
                                 STsdbQueryHandle* pQueryHandle);
H
hjxilinx 已提交
128

129
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
130
  pBlockLoadInfo->slot = -1;
H
hjLiao 已提交
131
  pBlockLoadInfo->tid = -1;
H
hjxilinx 已提交
132
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
133 134
}

135
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
136
  pCompBlockLoadInfo->tid = -1;
137 138
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
139

H
TD-353  
Hongze Cheng 已提交
140
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
141 142 143
  // todo 1. filter not exist table
  // todo 2. add the reference count for each table that is involved in query

144
  STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
H
Haojun Liao 已提交
145 146 147 148 149 150
  pQueryHandle->order       = pCond->order;
  pQueryHandle->window      = pCond->twindow;
  pQueryHandle->pTsdb       = tsdb;
  pQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
  pQueryHandle->cur.fid     = -1;
  pQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
151
  pQueryHandle->checkFiles  = true;
H
Haojun Liao 已提交
152
  pQueryHandle->activeIndex = 0;   // current active table index
H
Haojun Liao 已提交
153
  pQueryHandle->qinfo       = qinfo;
H
Haojun Liao 已提交
154 155
  pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
  
H
TD-100  
hzcheng 已提交
156
  tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
157

158
  size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
159
  assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
H
hjxilinx 已提交
160

H
Haojun Liao 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  // allocate buffer in order to load data blocks from file
  int32_t numOfCols = pCond->numOfCols;
  
  pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
  pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));  // todo: use list instead of array?
  
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData  colInfo = {{0}, 0};
  
    colInfo.info = pCond->colList[i];
    colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
    taosArrayPush(pQueryHandle->pColumns, &colInfo);
    pQueryHandle->statis[i].colId = colInfo.info.colId;
  }
  
176
  pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
177 178
  STsdbMeta* pMeta = tsdbGetMeta(tsdb);
  assert(pMeta != NULL);
179 180 181
  
  for (int32_t i = 0; i < sizeOfGroup; ++i) {
    SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
H
Haojun Liao 已提交
182
    
183
    size_t gsize = taosArrayGetSize(group);
184 185
    assert(gsize > 0);
    
186
    for (int32_t j = 0; j < gsize; ++j) {
187
      STable* pTable = (STable*) taosArrayGetP(group, j);
188 189 190
      
      STableCheckInfo info = {
          .lastKey = pQueryHandle->window.skey,
191 192
          .tableId = pTable->tableId,
          .pTableObj = pTable,
193
      };
H
Haojun Liao 已提交
194
      
195
      assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
B
Bomin Zhang 已提交
196
      info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
197

198 199
      taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
    }
H
hjxilinx 已提交
200
  }
201
  
S
Shengliang Guan 已提交
202
  tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
203

204 205
  tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
206

H
hjxilinx 已提交
207 208 209
  return (TsdbQueryHandleT) pQueryHandle;
}

H
TD-353  
Hongze Cheng 已提交
210
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
211
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
H
hjxilinx 已提交
212
  
213
  pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
H
hjxilinx 已提交
214 215
  pQueryHandle->order = TSDB_ORDER_DESC;
  
216
  changeQueryHandleForLastrowQuery(pQueryHandle);
H
hjxilinx 已提交
217
  return pQueryHandle;
H
hjxilinx 已提交
218 219
}

220
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
221 222 223 224 225
  assert(pHandle != NULL);
  
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
  
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
226
  SArray* res = taosArrayInit(size, POINTER_BYTES);
227 228 229
  
  for(int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
230
    taosArrayPush(res, &pCheckInfo->pTableObj);
231 232 233 234 235
  }
  
  return res;
}

H
TD-353  
Hongze Cheng 已提交
236
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
H
Haojun Liao 已提交
237
  STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
238 239
  
  pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
240
  changeQueryHandleForInterpQuery(pQueryHandle);
241 242 243
  return pQueryHandle;
}

244
static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
245 246 247
  STable* pTable = pCheckInfo->pTableObj;
  assert(pTable != NULL);
  
248
  if (pCheckInfo->initBuf) {
249 250 251
    return true;
  }
  
252
  pCheckInfo->initBuf = true;
253
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
254 255 256

  tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);

257
  // no data in buffer, abort
H
Haojun Liao 已提交
258
  if (pCheckInfo->mem == NULL && pCheckInfo->imem == NULL) {
259 260 261 262 263
    return false;
  }
  
  assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
  
H
TD-353  
Hongze Cheng 已提交
264
  if (pCheckInfo->mem && pCheckInfo->mem->tData[pCheckInfo->tableId.tid] != NULL) {
H
Haojun Liao 已提交
265 266
    pCheckInfo->iter = tSkipListCreateIterFromVal(pCheckInfo->mem->tData[pCheckInfo->tableId.tid]->pData,
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
267 268
  }
  
H
TD-353  
Hongze Cheng 已提交
269
  if (pCheckInfo->imem && pCheckInfo->imem->tData[pCheckInfo->tableId.tid] != NULL) {
H
Haojun Liao 已提交
270 271
    pCheckInfo->iiter = tSkipListCreateIterFromVal(pCheckInfo->imem->tData[pCheckInfo->tableId.tid]->pData,
        (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
  }
  
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
  
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
  
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
291
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
292 293
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
294
    tsdbDebug("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
295 296 297 298 299 300 301 302
  }
  
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
  
    SDataRow row = SL_GET_NODE_DATA(node);
    TSKEY key = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
303
    tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
304 305
           pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
  } else {
S
Shengliang Guan 已提交
306
    tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
307 308 309 310 311
  }
  
  return true;
}

H
Haojun Liao 已提交
312 313 314 315 316 317 318 319
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
  SDataRow rmem = NULL, rimem = NULL;
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
      rmem = SL_GET_NODE_DATA(node);
    }
  }
320

H
Haojun Liao 已提交
321 322 323 324 325 326
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
      rimem = SL_GET_NODE_DATA(node);
    }
  }
327

H
Haojun Liao 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341
  if (rmem != NULL && rimem != NULL) {
    if (dataRowKey(rmem) < dataRowKey(rimem)) {
      pCheckInfo->chosen = 0;
      return rmem;
    } else if (dataRowKey(rmem) == dataRowKey(rimem)) {
      // data ts are duplicated, ignore the data in mem
      tSkipListIterNext(pCheckInfo->iter);
      pCheckInfo->chosen = 1;
      return rimem;
    } else {
      pCheckInfo->chosen = 1;
      return rimem;
    }
  }
342

H
Haojun Liao 已提交
343 344 345 346
  if (rmem != NULL) {
    pCheckInfo->chosen = 0;
    return rmem;
  }
347

H
Haojun Liao 已提交
348 349 350 351
  if (rimem != NULL) {
    pCheckInfo->chosen = 1;
    return rimem;
  }
352

H
Haojun Liao 已提交
353 354 355 356 357 358 359 360 361
  return NULL;
}

bool moveToNextRow(STableCheckInfo* pCheckInfo) {
  bool hasNext = false;
  if (pCheckInfo->chosen == 0) {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
362

H
Haojun Liao 已提交
363 364 365
    if (hasNext) {
      return hasNext;
    }
366

H
Haojun Liao 已提交
367 368 369 370 371 372 373 374
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
  } else {
    if (pCheckInfo->chosen == 1) {
      if (pCheckInfo->iiter != NULL) {
        hasNext = tSkipListIterNext(pCheckInfo->iiter);
      }
375

H
Haojun Liao 已提交
376 377 378
      if (hasNext) {
        return hasNext;
      }
379

H
Haojun Liao 已提交
380 381 382 383 384
      if (pCheckInfo->iter != NULL) {
        return tSkipListIterGet(pCheckInfo->iter) != NULL;
      }
    }
  }
385

H
Haojun Liao 已提交
386 387 388
  return hasNext;
}

389
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
390 391
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
392 393 394
  pHandle->cur.fid = -1;
  
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
395

396
  STable* pTable = pCheckInfo->pTableObj;
397
  assert(pTable != NULL);
H
Haojun Liao 已提交
398
  
H
Haojun Liao 已提交
399 400 401
  initTableMemIterator(pHandle, pCheckInfo);
  SDataRow row = getSDataRowInTableMem(pCheckInfo);
  if (row == NULL) {
402 403
    return false;
  }
404

405
  pCheckInfo->lastKey = dataRowKey(row);  // first timestamp in buffer
S
Shengliang Guan 已提交
406
  tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
407
      pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
408
  
409
  // all data in mem are checked already.
410 411
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
412 413
    return false;
  }
414 415 416
  
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
417
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
418 419 420 421 422 423
                                    pHandle->outputCapacity, &win->skey, &win->ekey, pHandle);  // todo refactor API
  
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
424

425 426 427 428
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
    SWAP(win->skey, win->ekey, TSKEY);
  }
  
429
  return true;
430
}
H
hjxilinx 已提交
431

432 433
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
434 435 436 437
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
  
438
  int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision]));  // set the starting fileId
439 440 441 442 443
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
  
  if (fid > 0L && fid > INT32_MAX) {
444 445 446 447
    fid = INT32_MAX;
  }
  
  return fid;
448 449
}

H
Haojun Liao 已提交
450
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
  
  int32_t midSlot = firstSlot;
  
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
    
    if (numOfBlocks == 1) break;
    
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
  
  return midSlot;
}
476 477 478

static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
  SFileGroup* fileGroup = pQueryHandle->pFileGroup;
479
  assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
H
Haojun Liao 已提交
480 481 482 483 484 485 486

  int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);

  //open file failed, return error code to client
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
487 488 489 490 491 492 493 494

  // load all the comp offset value for all tables in this file
  *numOfBlocks = 0;
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);

  for (int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);

H
TD-100  
hzcheng 已提交
495
    SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
496 497
    if (compIndex->len == 0 || compIndex->numOfBlocks == 0 ||
        compIndex->uid != pCheckInfo->tableId.uid) {  // no data block in this file, try next file
498
      pCheckInfo->numOfBlocks = 0;
499
      continue;  // no data blocks in the file belongs to pCheckInfo->pTable
500
    } else {
H
hjxilinx 已提交
501 502 503 504 505 506 507
      if (pCheckInfo->compSize < compIndex->len) {
        assert(compIndex->len > 0);
        
        char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
        assert(t != NULL);
        
        pCheckInfo->pCompInfo = (SCompInfo*) t;
508
        pCheckInfo->compSize = compIndex->len;
H
hjxilinx 已提交
509 510
      }
      
H
hjxilinx 已提交
511
      tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
H
TD-100  
hzcheng 已提交
512 513

      tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
514 515 516 517 518
      SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
      
      TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      
519
      // discard the unqualified data block based on the query time window
H
Haojun Liao 已提交
520
      int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
521 522 523
      int32_t end = start;
      
      if (s > pCompInfo->blocks[start].keyLast) {
524 525 526
        continue;
      }

527
      // todo speedup the procedure of located end block
H
hzcheng 已提交
528
      while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
529
        end += 1;
530 531
      }

532 533
      pCheckInfo->numOfBlocks = (end - start);
      
534
      if (start > 0) {
535
        memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
536 537 538 539
      }

      (*numOfBlocks) += pCheckInfo->numOfBlocks;
    }
540
  }
541

542 543 544
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
545 546 547 548 549 550
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
                    .tid = (_checkInfo)->tableId.tid,                                  \
                    .uid = (_checkInfo)->tableId.uid})
551

552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  assert(numOfCols <= TSDB_MAX_COLUMNS);
  
  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
    taosArrayPush(pIdList, &pCol->info.colId);
  }
  
  return pIdList;
}

static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pQueryHandle);
  
  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
  
  // the primary timestamp column does not be included in the the specified load column list, add it
  if (loadTS && colId != 0) {
    int16_t columnId = 0;
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }
  
  return pLocalIdList;
}

580
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
H
hzcheng 已提交
581
  STsdbRepo *pRepo = pQueryHandle->pTsdb;
582
  SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
H
[td-32]  
hjxilinx 已提交
583

H
hjxilinx 已提交
584 585
  data->numOfCols = pBlock->numOfCols;
  data->uid = pCheckInfo->pTableObj->tableId.uid;
586 587 588

  bool    blockLoaded = false;
  SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
589

H
Haojun Liao 已提交
590
  int64_t st = taosGetTimestampUs();
591

H
Haojun Liao 已提交
592
  if (pCheckInfo->pDataCols == NULL) {
H
TD-353  
Hongze Cheng 已提交
593
    STsdbMeta* pMeta = tsdbGetMeta(pRepo);
H
TD-353  
Hongze Cheng 已提交
594
    // TODO
H
Haojun Liao 已提交
595
    pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
596 597
  }

H
TD-353  
Hongze Cheng 已提交
598
  tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
599

H
Hongze Cheng 已提交
600
  if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) {
601 602 603 604
    SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;

    pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
    pBlockLoadInfo->slot = pQueryHandle->cur.slot;
H
hjLiao 已提交
605
    pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
606 607 608 609

    blockLoaded = true;
  }

H
Haojun Liao 已提交
610 611
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
  assert(pCols->numOfRows != 0);
612

613 614
  taosArrayDestroy(sa);
  tfree(data);
615

H
Haojun Liao 已提交
616
  int64_t et = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
617
  tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
618

619
  return blockLoaded;
H
hjxilinx 已提交
620 621
}

622 623
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
624
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
625

626
  /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
627 628 629
  SDataRow row = getSDataRowInTableMem(pCheckInfo);

  TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
630 631
  cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
  
632 633
  if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
634
    
635 636 637
    if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {

H
Haojun Liao 已提交
638 639 640
      // do not load file block into buffer
      int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;

H
Haojun Liao 已提交
641
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
H
Haojun Liao 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654
                                        pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
      pQueryHandle->realNumOfRows = cur->rows;

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
      if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
        SWAP(cur->win.skey, cur->win.ekey, TSKEY);
      }
      
      cur->mixBlock = true;
      cur->blockCompleted = false;
      return;
    }
H
Haojun Liao 已提交
655 656
  
    SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
657
  
658 659
    doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
    doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
H
Haojun Liao 已提交
660 661
    taosArrayDestroy(sa);
    
662
  } else {
663 664 665 666 667 668
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
669
    assert(pQueryHandle->outputCapacity >= binfo.rows);
670
    pQueryHandle->realNumOfRows = binfo.rows;
671

672 673 674 675 676 677 678 679
    cur->rows = binfo.rows;
    cur->win  = binfo.window;
    cur->mixBlock = false;
    cur->blockCompleted = true;
    cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
  }
}

680 681 682 683
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
  SArray*        sa = getDefaultLoadColumns(pQueryHandle, true);
  SQueryFilePos* cur = &pQueryHandle->cur;

684
  if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
H
hjxilinx 已提交
685
    // query ended in current block
686
    if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
687 688 689
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
690

691
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
H
Haojun Liao 已提交
692
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
693
      
694 695
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
H
Haojun Liao 已提交
696
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
697 698 699
      } else {
        cur->pos = 0;
      }
700
      
701
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
702
    } else {  // the whole block is loaded in to buffer
703
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
704
    }
705
  } else {  //desc order, query ended in current block
706
    if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
707 708 709
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
710 711
  
      SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
712
      if (pCheckInfo->lastKey < pBlock->keyLast) {
713
        cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
714
      } else {
H
Haojun Liao 已提交
715
        cur->pos = pBlock->numOfRows - 1;
716 717
      }
      
718
      doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
719
    } else {
720
      handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
721
    }
722
  }
723

724
  taosArrayDestroy(sa);
H
[td-32]  
hjxilinx 已提交
725 726 727
  return pQueryHandle->realNumOfRows > 0;
}

728
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
729
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
730
  int    numOfRows;
731 732
  TSKEY* keyList;

733 734
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
  
735
  if (num <= 0) return -1;
736 737

  keyList = (TSKEY*)pValue;
738 739
  firstPos = 0;
  lastPos = num - 1;
740

741
  if (order == TSDB_ORDER_DESC) {
742 743 744 745 746
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
747

H
Haojun Liao 已提交
748 749
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
750

751 752 753 754 755 756 757 758
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
759

760 761 762 763 764
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
765

766 767 768 769 770 771 772
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
773

H
Haojun Liao 已提交
774 775
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
776

777 778 779 780 781 782 783 784 785
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
786

787 788 789
  return midPos;
}

H
Haojun Liao 已提交
790
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
791
  char* pData = NULL;
792
  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
793
  
794 795 796
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
  TSKEY* tsArray = pCols->cols[0].pData;
  
797
  int32_t num = end - start + 1;
798
  int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
799 800
  
  //data in buffer has greater timestamp, copy data in file block
801 802
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
803
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
804 805 806 807 808 809 810 811 812

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

    int32_t bytes = pColInfo->info.bytes;

813
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
814 815
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
816
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
817
    }
818 819 820 821 822 823 824 825 826 827 828 829 830

    if (pColInfo->info.colId == src->colId) {

      if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
        memmove(pData, src->pData + bytes * start, bytes * num);
      } else {  // handle the var-string
        char* dst = pData;

        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
          char* p = tdGetColDataOfRow(src, k);
          memcpy(dst, p, varDataTLen(p));
          dst += bytes;
831 832
        }
      }
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        char* dst = pData;

        for(int32_t k = start; k < num + start; ++k) {
          setVardataNull(dst, pColInfo->info.type);
          dst += bytes;
        }
      } else {
        setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
      }
      i++;
848 849
    }
  }
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867

  while (i < requiredNumOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
    }

    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
      char* dst = pData;

      for(int32_t k = start; k < num + start; ++k) {
        setVardataNull(dst, pColInfo->info.type);
        dst += pColInfo->info.bytes;
      }
    } else {
      setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
868
    }
869 870

    i++;
871 872
  }
  
873 874
  pQueryHandle->cur.win.ekey = tsArray[end];
  pQueryHandle->cur.lastKey = tsArray[end] + step;
875
  
876
  return numOfRows + num;
877 878
}

879 880
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
                              STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
881
  char* pData = NULL;
882 883

  // the schema version info is embeded in SDataRow
H
TD-353  
Hongze Cheng 已提交
884
  STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
885 886 887 888
  int32_t numOfRowCols = schemaNCols(pSchema);

  int32_t i = 0, j = 0;
  while(i < numOfCols && j < numOfRowCols) {
889
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
890 891 892 893 894
    if (pSchema->columns[j].colId < pColInfo->info.colId) {
      j++;
      continue;
    }

895
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
896 897 898 899
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915

    if (pSchema->columns[j].colId == pColInfo->info.colId) {
      void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        memcpy(pData, value, varDataTLen(value));
      } else {
        memcpy(pData, value, pColInfo->info.bytes);
      }

      j++;
      i++;
    } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        setVardataNull(pData, pColInfo->info.type);
      } else {
        setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
916
      }
917
      i++;
918
    }
919 920 921 922 923 924 925 926 927 928
  }

  while (i < numOfCols) { // the remain columns are all null data
    SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
      pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }

929
    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
930
      setVardataNull(pData, pColInfo->info.type);
931
    } else {
932
      setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
933
    }
934 935

    i++;
936 937 938
  }
}

H
[td-32]  
hjxilinx 已提交
939 940
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
941
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
942 943
                                  SArray* sa) {
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
944
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
945
  
946
  initTableMemIterator(pQueryHandle, pCheckInfo);
947
  SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
948 949 950 951 952 953 954 955 956 957

  // for search the endPos, so the order needs to reverse
  int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;

  int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);

  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

958
  int32_t endPos = cur->pos;
959
  if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
960
    endPos = blockInfo.rows - 1;
961
    cur->mixBlock = (cur->pos != 0);
962
  } else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
963
    endPos = 0;
964
    cur->mixBlock = (cur->pos != blockInfo.rows - 1);
965
  } else {
966 967
    assert(pCols->numOfRows > 0);
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
968
    cur->mixBlock = true;
969
  }
H
hjxilinx 已提交
970
  
971
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
972
  int32_t pos = cur->pos;
H
hjxilinx 已提交
973
  
974 975
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
  TSKEY* tsArray = pCols->cols[0].pData;
H
hjxilinx 已提交
976
  
977 978
  int32_t numOfRows = 0;
  pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
979

980 981
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
982 983
    int32_t start = cur->pos;
    int32_t end = endPos;
984
    if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
985 986 987 988 989 990 991 992
      end = cur->pos;
      start = endPos;
    }
    
    cur->win.skey = tsArray[start];
    cur->win.ekey = tsArray[end];
    
    // todo opt in case of no data in buffer
H
Haojun Liao 已提交
993
    numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
994
    
995
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
996
    if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
997
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
998 999

      for(int32_t i = 0; i < numOfCols; ++i) {
1000 1001 1002 1003 1004
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  
H
Haojun Liao 已提交
1005
    pos += (end - start + 1) * step;
1006 1007
    cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
        ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
H
Haojun Liao 已提交
1008
    
1009 1010 1011
    pCheckInfo->lastKey = cur->lastKey;
    pQueryHandle->realNumOfRows = numOfRows;
    cur->rows = numOfRows;
1012
    return;
1013
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1014 1015
    SSkipListNode* node = NULL;
    do {
1016 1017
      SDataRow row = getSDataRowInTableMem(pCheckInfo);
      if (row == NULL) {
H
[td-32]  
hjxilinx 已提交
1018
        break;
1019
      }
1020

1021
      TSKEY key = dataRowKey(row);
1022 1023
      if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1024 1025 1026
        break;
      }

1027 1028
      if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1029 1030 1031
        break;
      }

1032 1033
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1034
        copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
1035 1036 1037 1038
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1039

1040
        cur->win.ekey = key;
1041 1042 1043
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1044
        moveToNextRow(pCheckInfo);
1045
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
1046
        moveToNextRow(pCheckInfo);
1047 1048
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1049 1050 1051
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1052

1053
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
H
Haojun Liao 已提交
1054
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
H
Haojun Liao 已提交
1055
          moveToNextRow(pCheckInfo);
H
Haojun Liao 已提交
1056 1057
        }
        
1058
        int32_t start = -1;
1059
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
          int32_t remain = end - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
        } else {
          int32_t remain = (pos - end) + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = end;
          end = pos;
        }

H
Haojun Liao 已提交
1076
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1077 1078 1079 1080 1081
        pos += (end - start + 1) * step;
      }
    } while (numOfRows < pQueryHandle->outputCapacity);
    
    if (numOfRows < pQueryHandle->outputCapacity) {
H
Haojun Liao 已提交
1082 1083 1084 1085
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1086
      if (node == NULL ||
1087 1088
          ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1089 1090 1091 1092 1093 1094 1095 1096 1097
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

        int32_t start = -1;
        int32_t end = -1;

        // all remain data are qualified, but check the remain capacity in the first place.
1098
        if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
          int32_t remain = endPos - pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
          }

          start = pos;
          end = endPos;
        } else {
          int32_t remain = pos + 1;
          if (remain + numOfRows > pQueryHandle->outputCapacity) {
            endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
          }

          start = endPos;
          end = pos;
        }

H
Haojun Liao 已提交
1116
        numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
1117
        pos += (end - start + 1) * step;
1118
      }
1119 1120
    }
  }
1121
  
1122 1123
  cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
      ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
1124

1125
  if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
1126 1127 1128 1129 1130
    SWAP(cur->win.skey, cur->win.ekey, TSKEY);
  
    // if the buffer is not full in case of descending order query, move the data in the front of the buffer
    if (numOfRows < pQueryHandle->outputCapacity) {
      int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
1131
      for(int32_t i = 0; i < numOfCols; ++i) {
1132 1133 1134 1135 1136 1137
        SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
        memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
      }
    }
  }
  
1138 1139 1140 1141
  pCheckInfo->lastKey = cur->lastKey;
  pQueryHandle->realNumOfRows = numOfRows;
  cur->rows = numOfRows;
  cur->pos = pos;
1142

S
Shengliang Guan 已提交
1143
  tsdbDebug("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" rows:%d, %p", pQueryHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, cur->win.skey,
1144
      cur->win.ekey, cur->rows, pQueryHandle->qinfo);
1145 1146
}

1147
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
1148
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1149
  int    numOfRows;
1150 1151
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
1152
  if (num <= 0) return -1;
1153 1154

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
1155 1156
  firstPos = 0;
  lastPos = num - 1;
1157

1158
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
1159 1160 1161 1162 1163
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1164

H
Haojun Liao 已提交
1165 1166
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1167

H
[td-32]  
hjxilinx 已提交
1168 1169 1170 1171 1172 1173 1174 1175
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1176

H
[td-32]  
hjxilinx 已提交
1177 1178 1179 1180 1181
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1182

H
[td-32]  
hjxilinx 已提交
1183 1184 1185 1186 1187 1188 1189
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1190

H
Haojun Liao 已提交
1191 1192
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1193

H
[td-32]  
hjxilinx 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1203

H
[td-32]  
hjxilinx 已提交
1204 1205 1206
  return midPos;
}

1207
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
1208
  tfree(pSupporter->numOfBlocksPerTable);
1209 1210 1211
  tfree(pSupporter->blockIndexArray);

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
1212 1213
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
    tfree(pBlockInfo);
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
  }

  tfree(pSupporter->pDataBlockInfo);
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

1228
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
1229 1230
    /* left block is empty */
    return 1;
1231
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
1232 1233 1234 1235 1236 1237 1238
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
1239 1240 1241
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
1242
    // todo add more information
B
Bomin Zhang 已提交
1243
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
1244 1245
  }

H
Haojun Liao 已提交
1246
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
1247 1248
}

1249
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
1250 1251
  char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks);
  if (tmp == NULL) {
1252
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1253 1254
  }

H
Haojun Liao 已提交
1255
  pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
1256 1257 1258 1259
  memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks);
  *numOfAllocBlocks = numOfBlocks;

  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1260

1261 1262
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
1263
  sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
1264 1265 1266
  sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);

1267
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
1268
    cleanBlockOrderSupporter(&sup, 0);
1269
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1270
  }
H
Haojun Liao 已提交
1271
  
1272
  int32_t cnt = 0;
1273
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
1274
  
1275 1276
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
1277 1278 1279 1280
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
    
1281
    SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
1282
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
1283 1284 1285

    char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
    if (buf == NULL) {
1286
      cleanBlockOrderSupporter(&sup, numOfQualTables);
1287
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
1288 1289
    }

1290
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
1291 1292

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
1293
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
1294

H
Haojun Liao 已提交
1295 1296
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
1297 1298 1299
      cnt++;
    }

1300
    numOfQualTables++;
1301 1302
  }

H
Haojun Liao 已提交
1303
  assert(numOfBlocks == cnt);
1304

H
Haojun Liao 已提交
1305 1306 1307 1308
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
    memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
    cleanBlockOrderSupporter(&sup, numOfQualTables);
1309

S
Shengliang Guan 已提交
1310
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
H
Haojun Liao 已提交
1311 1312 1313
        pQueryHandle->qinfo);
    return TSDB_CODE_SUCCESS;
  }
1314

S
Shengliang Guan 已提交
1315
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
H
Haojun Liao 已提交
1316
      numOfQualTables, pQueryHandle->qinfo);
1317

1318
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
1319
  sup.numOfTables = numOfQualTables;
1320

H
Haojun Liao 已提交
1321
  SLoserTreeInfo* pTree = NULL;
1322 1323 1324
  uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
1325
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
1326 1327 1328 1329 1330 1331 1332 1333
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
    int32_t pos = pTree->pNode[0].index;
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
1334 1335
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
    pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
1336 1337

    // set data block index overflow, in order to disable the offset comparator
1338 1339
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
1340
    }
1341 1342 1343 1344 1345 1346 1347

    tLoserTreeAdjust(pTree, pos + sup.numOfTables);
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
1348
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
1349 1350 1351
   * }
   */

S
Shengliang Guan 已提交
1352
  tsdbDebug("%p %d data blocks sort completed", pQueryHandle, cnt);
1353 1354 1355 1356 1357 1358
  cleanBlockOrderSupporter(&sup, numOfTables);
  free(pTree);

  return TSDB_CODE_SUCCESS;
}

1359
// todo opt for only one table case
H
Haojun Liao 已提交
1360
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
1361 1362
  pQueryHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pQueryHandle->cur;
H
Haojun Liao 已提交
1363 1364 1365

  int32_t code = TSDB_CODE_SUCCESS;

1366 1367 1368 1369
  int32_t numOfBlocks = 0;
  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
1370
    int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
H
Haojun Liao 已提交
1371
    if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
1372 1373 1374
      break;
    }
    
S
Shengliang Guan 已提交
1375
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
H
Haojun Liao 已提交
1376
           numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
1377
    
1378 1379 1380 1381 1382
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
    
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395
    // todo return error code to query engine
    if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
      break;
    }
    
    assert(numOfBlocks >= pQueryHandle->numOfBlocks);
    if (pQueryHandle->numOfBlocks > 0) {
      break;
    }
  }
  
  // no data in file anymore
  if (pQueryHandle->numOfBlocks <= 0) {
H
Haojun Liao 已提交
1396 1397 1398 1399
    if (code == TSDB_CODE_SUCCESS) {
      assert(pQueryHandle->pFileGroup == NULL);
    }

1400
    cur->fid = -1;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
1401 1402
    *exists = false;
    return code;
1403 1404
  }
  
1405
  cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
1406 1407 1408
  cur->fid = pQueryHandle->pFileGroup->fileId;
  
  STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1409 1410 1411
  *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);

  return TSDB_CODE_SUCCESS;
1412 1413
}

H
Haojun Liao 已提交
1414
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
1415 1416 1417 1418 1419 1420
  STsdbFileH*    pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
  SQueryFilePos* cur = &pQueryHandle->cur;

  // find the start data block in file
  if (!pQueryHandle->locateStart) {
    pQueryHandle->locateStart = true;
1421 1422
    STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
    int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
1423 1424
    
    tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
1425 1426
    tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);

H
Haojun Liao 已提交
1427
    return getDataBlocksInFilesImpl(pQueryHandle, exists);
1428
  } else {
1429 1430 1431 1432 1433
    // check if current file block is all consumed
    STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
    
    // current block is done, try next
1434
    if (!cur->mixBlock || cur->blockCompleted) {
1435 1436
      if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
          (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
1437
        // all data blocks in current file has been checked already, try next file if exists
H
Haojun Liao 已提交
1438
        return getDataBlocksInFilesImpl(pQueryHandle, exists);
H
Haojun Liao 已提交
1439 1440
      } else {
        // next block of the same file
1441
        int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
1442
        cur->slot += step;
1443 1444 1445 1446
        
        cur->mixBlock = false;
        cur->blockCompleted = false;
        
1447
        STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1448 1449 1450
        *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);

        return TSDB_CODE_SUCCESS;
1451
      }
1452
    } else {
H
Haojun Liao 已提交
1453
      handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
H
Haojun Liao 已提交
1454 1455 1456
      *exists = pQueryHandle->realNumOfRows > 0;

      return TSDB_CODE_SUCCESS;
1457 1458
    }
  }
1459 1460
}

1461 1462
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1463
  assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
1464
  
1465 1466
  while (pQueryHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pQueryHandle)) {
1467 1468 1469
      return true;
    }
    
1470 1471 1472 1473 1474 1475 1476
    pQueryHandle->activeIndex += 1;
  }
  
  return false;
}

// handle data in cache situation
1477 1478
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
1479 1480 1481 1482
  
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  assert(numOfTables > 0);
  
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
  if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
    pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
    pQueryHandle->order = TSDB_ORDER_DESC;
    
    if (!tsdbNextDataBlock(pHandle)) {
      return false;
    }
    
    SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
    /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle);
    /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa);
  
    if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
      // data already retrieve, discard other data rows and return
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
      }
  
      pQueryHandle->cur.win  = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 1;
      pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
      return true;
    } else {
      STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
      pSecQueryHandle->order       = TSDB_ORDER_ASC;
      pSecQueryHandle->window      = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
      pSecQueryHandle->pTsdb       = pQueryHandle->pTsdb;
      pSecQueryHandle->type        = TSDB_QUERY_TYPE_ALL;
      pSecQueryHandle->cur.fid     = -1;
      pSecQueryHandle->cur.win     = TSWINDOW_INITIALIZER;
      pSecQueryHandle->checkFiles  = true;
      pSecQueryHandle->activeIndex = 0;
      pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
  
      tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
  
      // allocate buffer in order to load data blocks from file
      int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  
      pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
      pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData colInfo = {{0}, 0};
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        
        colInfo.info = pCol->info;
        colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
        taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
        pSecQueryHandle->statis[i].colId = colInfo.info.colId;
      }
  
      size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
      pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
      STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
      assert(pMeta != NULL);
  
      for (int32_t j = 0; j < si; ++j) {
        STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
    
        STableCheckInfo info = {
            .lastKey = pSecQueryHandle->window.skey,
            .tableId = pCheckInfo->tableId,
            .pTableObj = pCheckInfo->pTableObj,
        };
        
        taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
      }
  
      tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
      tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
  
      bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
      assert(ret);

      /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle);
      /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa);
  
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
        memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
  
        SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
        assert(pCol->info.colId == pCol1->info.colId);
        
        memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
      }
  
      SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
      
      pQueryHandle->cur.win  = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
      pQueryHandle->window   = pQueryHandle->cur.win;
      pQueryHandle->cur.rows = 2;
      
      tsdbCleanupQueryHandle(pSecQueryHandle);
    }
    
    pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
    return true;
  }
  
1587
  if (pQueryHandle->checkFiles) {
H
Haojun Liao 已提交
1588 1589 1590 1591 1592 1593 1594 1595
    bool exists = true;
    int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }

    if (exists) {
      return exists;
1596
    }
1597 1598 1599 1600 1601
  
    pQueryHandle->activeIndex = 0;
    pQueryHandle->checkFiles  = false;
  }
  
H
Haojun Liao 已提交
1602 1603
  // TODO: opt by consider the scan order
  return doHasDataInBuffer(pQueryHandle);
1604 1605
}

1606
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
H
hjxilinx 已提交
1607
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
1608
  assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
H
hjxilinx 已提交
1609 1610 1611 1612 1613 1614
  
  // starts from the buffer in case of descending timestamp order check data blocks
  
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
1615
  TSKEY key = TSKEY_INITIAL_VAL;
H
hjxilinx 已提交
1616 1617 1618 1619
  int32_t index = -1;
  
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
1620
    if (pCheckInfo->pTableObj->lastKey > key) {
H
hjxilinx 已提交
1621 1622 1623 1624 1625
      key = pCheckInfo->pTableObj->lastKey;
      index = i;
    }
  }
  
1626
  if (index == -1) {
1627
    // todo add failure test cases
1628 1629 1630
    return;
  }
  
1631
  // erase all other elements in array list
H
hjxilinx 已提交
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    if (i == index) {
      continue;
    }
    
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    tSkipListDestroyIter(pTableCheckInfo->iter);
    
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
    
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
1650
  taosArrayClear(pQueryHandle->pTableCheckInfo);
H
Haojun Liao 已提交
1651 1652
  
  info.lastKey = key;
H
hjxilinx 已提交
1653 1654 1655 1656 1657 1658
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {key, key};
}

1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
  // filter the queried time stamp in the first place
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
  pQueryHandle->order = TSDB_ORDER_DESC;
  
  assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
  
  // starts from the buffer in case of descending timestamp order check data blocks
  // todo consider the query time window, current last_row does not apply the query time window
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  int32_t i = 0;
  while(i < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
        pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
      break;
    }
    
    i++;
  }
  
  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }
  
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
  taosArrayClear(pQueryHandle->pTableCheckInfo);
  
  info.lastKey = pQueryHandle->window.skey;
  taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
  
  // update the query time window according to the chosen last timestamp
  pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}

H
Haojun Liao 已提交
1696
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
1697
                                 STsdbQueryHandle* pQueryHandle) {
1698
  int     numOfRows = 0;
1699
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
1700
  *skey = TSKEY_INITIAL_VAL;
1701

H
Haojun Liao 已提交
1702
  int64_t st = taosGetTimestampUs();
1703 1704 1705
  STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
  STable* pTable = pCheckInfo->pTableObj;

1706
  do {
H
Haojun Liao 已提交
1707 1708
    SDataRow row = getSDataRowInTableMem(pCheckInfo);
    if (row == NULL) {
1709 1710
      break;
    }
1711

1712
    TSKEY key = dataRowKey(row);
1713
    if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
S
Shengliang Guan 已提交
1714
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
1715 1716 1717 1718
          pQueryHandle->window.ekey);
      
      break;
    }
1719

1720
    if (*skey == INT64_MIN) {
H
Haojun Liao 已提交
1721
      *skey = key;
1722
    }
1723

H
Haojun Liao 已提交
1724
    *ekey = key;
1725 1726
    copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);

1727
    if (++numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
1728
      moveToNextRow(pCheckInfo);
1729 1730 1731
      break;
    }
    
H
Haojun Liao 已提交
1732
  } while(moveToNextRow(pCheckInfo));
1733

1734 1735 1736
  assert(numOfRows <= maxRowsToRead);
  
  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1737
  if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
1738 1739 1740 1741 1742 1743 1744 1745
    int32_t emptySize = maxRowsToRead - numOfRows;
    
    for(int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
      memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }
  
H
Haojun Liao 已提交
1746
  int64_t elapsedTime = taosGetTimestampUs() - st;
S
Shengliang Guan 已提交
1747
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
H
Haojun Liao 已提交
1748
            elapsedTime, numOfRows, numOfCols);
1749

1750
  return numOfRows;
H
hjxilinx 已提交
1751 1752
}

H
hzcheng 已提交
1753
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
1754
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
1755
  SQueryFilePos* cur = &pHandle->cur;
1756
  STable* pTable = NULL;
1757
  
1758
  // there are data in file
1759
  if (pHandle->cur.fid >= 0) {
1760 1761
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
    pTable = pBlockInfo->pTableCheckInfo->pTableObj;
H
[td-32]  
hjxilinx 已提交
1762
  } else {
1763
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
1764
    pTable = pCheckInfo->pTableObj;
1765
  }
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
  
  SDataBlockInfo blockInfo = {
      .uid = pTable->tableId.uid,
      .tid = pTable->tableId.tid,
      .rows = cur->rows,
      .window = cur->win,
      .numOfCols = QH_GET_NUM_OF_COLS(pHandle),
  };

  return blockInfo;
1776
}
H
hjxilinx 已提交
1777

H
Haojun Liao 已提交
1778 1779 1780
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
H
hzcheng 已提交
1781
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
H
Haojun Liao 已提交
1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
  
  SQueryFilePos* cur = &pHandle->cur;
  if (cur->mixBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
  assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
      ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
  
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
1794 1795 1796 1797 1798 1799 1800
  
  // file block with subblocks has no statistics data
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
  
H
Haojun Liao 已提交
1801 1802
  tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
  
H
Haojun Liao 已提交
1803
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
1804 1805 1806 1807 1808 1809 1810 1811
  for(int32_t i = 0; i < numOfCols; ++i) {
    SDataStatis* st = &pHandle->statis[i];
    int32_t colId = st->colId;
    
    memset(st, 0, sizeof(SDataStatis));
    st->colId = colId;
  }
  
H
Haojun Liao 已提交
1812 1813
  tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
  
H
Haojun Liao 已提交
1814 1815
  *pBlockStatis = pHandle->statis;
  
H
Haojun Liao 已提交
1816 1817
  //update the number of NULL data rows
  for(int32_t i = 0; i < numOfCols; ++i) {
1818
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
1819 1820 1821 1822
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
  }
  
1823
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1824 1825
}

H
hzcheng 已提交
1826
SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
1827
  /**
H
hjxilinx 已提交
1828
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
1829 1830
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
1831 1832
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;

H
[td-32]  
hjxilinx 已提交
1833 1834 1835
  if (pHandle->cur.fid < 0) {
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
1836 1837
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
1838

1839
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
1840 1841
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
1842
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
1843 1844
      assert(pHandle->realNumOfRows <= binfo.rows);
  
H
hjxilinx 已提交
1845 1846
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
hjLiao 已提交
1847 1848 1849
      
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
          pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
H
hjxilinx 已提交
1850
        return pHandle->pColumns;
H
Haojun Liao 已提交
1851
      } else {  // only load the file block
H
Haojun Liao 已提交
1852
        SCompBlock* pBlock = pBlockInfo->compBlock;
1853
        doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
H
Haojun Liao 已提交
1854 1855
  
        // todo refactor
H
Haojun Liao 已提交
1856
        int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
1857 1858
  
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1859
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
1860 1861 1862 1863 1864 1865 1866 1867 1868
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
          int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);
    
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
            memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
          }
        }
        
H
hjxilinx 已提交
1869 1870
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
1871 1872
    }
  }
H
hjxilinx 已提交
1873 1874
}

H
hzcheng 已提交
1875
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
1876

1877
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
H
hjxilinx 已提交
1878
  SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
1879
  while (tSkipListIterNext(iter)) {
H
hjxilinx 已提交
1880
    SSkipListNode* pNode = tSkipListIterGet(iter);
1881
    
H
Haojun Liao 已提交
1882
    STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
1883
    taosArrayPush(list, pTable);
1884
  }
1885
  
1886
  tSkipListDestroyIter(iter);
1887
  return TSDB_CODE_SUCCESS;
1888 1889
}

1890
static void destroyHelper(void* param) {
1891 1892 1893
  if (param == NULL) {
    return;
  }
1894

H
hjxilinx 已提交
1895
  
1896
  tQueryInfo* pInfo = (tQueryInfo*)param;
H
hjxilinx 已提交
1897 1898 1899 1900 1901
  if (pInfo->optr != TSDB_RELATION_IN) {
    tfree(pInfo->q);
  }
  
//  tVariantDestroy(&(pInfo->q));
1902 1903 1904
  free(param);
}

H
hjxilinx 已提交
1905
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
1906 1907
  // filter on table name(TBNAME)
  if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
H
hjxilinx 已提交
1908
    return TSDB_TBNAME_COLUMN_INDEX;
1909
  }
H
hjxilinx 已提交
1910 1911 1912 1913
  
  for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
    STColumn* pColumn = &pTSchema->columns[i];
    if (pColumn->bytes == pSchema->bytes && pColumn->type  == pSchema->type  && pColumn->colId == pSchema->colId) {
H
hjxilinx 已提交
1914
      return i;
1915 1916
    }
  }
H
hjxilinx 已提交
1917 1918
  
  return -2;
1919 1920 1921
}

void filterPrepare(void* expr, void* param) {
1922
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
1923
  if (pExpr->_node.info != NULL) {
1924 1925
    return;
  }
1926

H
hjxilinx 已提交
1927
  int32_t i = 0;
H
[td-32]  
hjxilinx 已提交
1928
  pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
H
hjxilinx 已提交
1929 1930
  
  STSchema* pTSSchema = (STSchema*) param;
1931

H
hjxilinx 已提交
1932 1933 1934
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
1935

1936
  // todo : if current super table does not change schema yet, this function may fail to get correct schema, test case
H
hjxilinx 已提交
1937
  int32_t index = getTagColumnIndex(pTSSchema, pSchema);
H
hjxilinx 已提交
1938
  assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX));
1939

1940
  pInfo->sch      = *pSchema;
H
hjxilinx 已提交
1941
  pInfo->colIndex = index;
1942
  pInfo->optr     = pExpr->_node.optr;
H
hjxilinx 已提交
1943
  pInfo->compare  = getComparFunc(pSchema->type, pInfo->optr);
H
hjxilinx 已提交
1944
  pInfo->param    = pTSSchema;
H
hjxilinx 已提交
1945 1946 1947 1948 1949
  
  if (pInfo->optr == TSDB_RELATION_IN) {
    pInfo->q = (char*) pCond->arr;
  } else {
    pInfo->q = calloc(1, pSchema->bytes);
1950
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
1951
  }
1952 1953
}

1954 1955 1956 1957
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
  STSchema*  pTagSchema;
1958
//  void*      tsdbMeta;
1959 1960 1961 1962
} STableGroupSupporter;

int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
1963 1964
  STable* pTable1 = *(STable**) p1;
  STable* pTable2 = *(STable**) p2;
1965 1966 1967 1968 1969
  
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
    
H
Haojun Liao 已提交
1970
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
1971
    
1972 1973 1974 1975 1976
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
    
1977
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {  // todo refactor extract method , to queryExecutor to generate tags values
H
hjxilinx 已提交
1978 1979
      f1 = (char*) pTable1->name;
      f2 = (char*) pTable2->name;
1980
      type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
1981
      bytes = tGetTableNameColumnSchema().bytes;
1982
    } else {
H
hjxilinx 已提交
1983 1984
      STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
      bytes = pCol->bytes;
1985
      type = pCol->type;
H
Hongze Cheng 已提交
1986 1987
      f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
      f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
1988
    }
H
Haojun Liao 已提交
1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  
  return 0;
}

2014
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
H
hjxilinx 已提交
2015
    __ext_compar_fn_t compareFn) {
2016
  STable* pTable = taosArrayGetP(pTableList, 0);
2017
  
2018 2019 2020 2021
  SArray* g = taosArrayInit(16, POINTER_BYTES);
  taosArrayPush(g, &pTable);
  tsdbRefTable(pTable);

2022
  for (int32_t i = 1; i < numOfTables; ++i) {
2023 2024
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
hjxilinx 已提交
2025 2026
    
    int32_t ret = compareFn(prev, p, pSupp);
2027 2028
    assert(ret == 0 || ret == -1);
    
2029 2030 2031
    tsdbRefTable(*p);
    assert((*p)->type == TSDB_CHILD_TABLE);

2032
    if (ret == 0) {
H
hjxilinx 已提交
2033
      taosArrayPush(g, p);
2034 2035
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
2036
      g = taosArrayInit(16, POINTER_BYTES);
H
hjxilinx 已提交
2037
      taosArrayPush(g, p);
2038 2039
    }
  }
2040 2041
  
  taosArrayPush(pGroups, &g);
2042 2043
}

2044
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
2045
  assert(pTableList != NULL);
2046 2047 2048 2049
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
  
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
2050
    tsdbDebug("no qualified tables");
2051 2052 2053 2054
    return pTableGroup;
  }
  
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
2055
    SArray* sa = taosArrayInit(size, POINTER_BYTES);
weixin_48148422's avatar
weixin_48148422 已提交
2056
    for(int32_t i = 0; i < size; ++i) {
2057 2058 2059 2060 2061
      STable** pTable = taosArrayGet(pTableList, i);
      assert((*pTable)->type == TSDB_CHILD_TABLE);

      tsdbRefTable(*pTable);
      taosArrayPush(sa, pTable);
2062 2063 2064
    }
    
    taosArrayPush(pTableGroup, &sa);
S
Shengliang Guan 已提交
2065
    tsdbDebug("all %zu tables belong to one group", size);
2066 2067 2068 2069 2070 2071
  } else {
    STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
    pSupp->numOfCols = numOfOrderCols;
    pSupp->pTagSchema = pTagSchema;
    pSupp->pCols = pCols;
    
2072
    taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
H
hjxilinx 已提交
2073
    createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
2074 2075 2076 2077 2078 2079
    tfree(pSupp);
  }
  
  return pTableGroup;
}

2080
bool indexedNodeFilterFp(const void* pNode, void* param) {
H
hjxilinx 已提交
2081
  tQueryInfo* pInfo = (tQueryInfo*) param;
H
hjxilinx 已提交
2082
  
H
Haojun Liao 已提交
2083
  STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
2084

2085
  char*  val = NULL;
2086
  int8_t type = pInfo->sch.type;
H
hjxilinx 已提交
2087

2088
  if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
H
Haojun Liao 已提交
2089
    val = (char*) pTable->name;
2090 2091
    type = TSDB_DATA_TYPE_BINARY;
  } else {
H
Haojun Liao 已提交
2092
    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
2093
  }
2094
  
T
Tao Liu 已提交
2095
  //todo :the val is possible to be null, so check it out carefully
2096
  int32_t ret = 0;
2097 2098
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    if (pInfo->optr == TSDB_RELATION_IN) {
H
hjxilinx 已提交
2099
      ret = pInfo->compare(val, pInfo->q);
2100
    } else {
H
hjxilinx 已提交
2101
      ret = pInfo->compare(val, pInfo->q);
2102
    }
2103
  } else {
H
hjxilinx 已提交
2104
    ret = pInfo->compare(val, pInfo->q);
2105
  }
2106

2107 2108 2109 2110 2111 2112 2113
  switch (pInfo->optr) {
    case TSDB_RELATION_EQUAL: {
      return ret == 0;
    }
    case TSDB_RELATION_NOT_EQUAL: {
      return ret != 0;
    }
2114
    case TSDB_RELATION_GREATER_EQUAL: {
2115 2116
      return ret >= 0;
    }
2117
    case TSDB_RELATION_GREATER: {
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128
      return ret > 0;
    }
    case TSDB_RELATION_LESS_EQUAL: {
      return ret <= 0;
    }
    case TSDB_RELATION_LESS: {
      return ret < 0;
    }
    case TSDB_RELATION_LIKE: {
      return ret == 0;
    }
weixin_48148422's avatar
weixin_48148422 已提交
2129
    case TSDB_RELATION_IN: {
2130
      return ret == 1;
weixin_48148422's avatar
weixin_48148422 已提交
2131
    }
2132

2133 2134 2135
    default:
      assert(false);
  }
H
hjxilinx 已提交
2136
  
2137 2138 2139
  return true;
}

2140
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
H
hjxilinx 已提交
2141 2142
  // query according to the expression tree
  SExprTraverseSupp supp = {
2143
      .nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
H
hjxilinx 已提交
2144 2145
      .setupInfoFn = filterPrepare,
      .pExtInfo = pSTable->tagSchema,
2146
      };
2147

2148
  tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
2149
  tExprTreeDestroy(&pExpr, destroyHelper);
2150 2151 2152
  return TSDB_CODE_SUCCESS;
}

H
TD-353  
Hongze Cheng 已提交
2153
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
2154 2155
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
                                 SColIndex* pColIndex, int32_t numOfCols) {
2156 2157
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

H
hjxilinx 已提交
2158 2159
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2160
    tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
2161 2162 2163 2164
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    tsdbUnlockRepoMeta(tsdb);

    goto _error;
2165 2166
  }
  
H
hjxilinx 已提交
2167
  if (pTable->type != TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2168
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
B
Bomin Zhang 已提交
2169
        pTable->name->data);
2170 2171 2172 2173
    terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client

    tsdbUnlockRepoMeta(tsdb);
    goto _error;
H
hjxilinx 已提交
2174
  }
2175 2176 2177

  //NOTE: not add ref count for super table
  SArray* res = taosArrayInit(8, POINTER_BYTES);
H
TD-353  
Hongze Cheng 已提交
2178
  STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
2179
  
weixin_48148422's avatar
weixin_48148422 已提交
2180 2181
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
2182 2183 2184 2185
    int32_t ret = getAllTableList(pTable, res);
    if (ret != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepoMeta(tsdb);
      goto _error;
2186
    }
2187 2188 2189 2190

    pGroupInfo->numOfTables = taosArrayGetSize(res);
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
      
S
Shengliang Guan 已提交
2191
    tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
2192
    taosArrayDestroy(res);
2193 2194

    if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2195 2196
    return ret;
  }
2197

H
hjxilinx 已提交
2198
  int32_t ret = TSDB_CODE_SUCCESS;
2199
  tExprNode* expr = NULL;
2200

2201 2202
  TRY(32) {
    expr = exprTreeFromTableName(tbnameCond);
weixin_48148422's avatar
weixin_48148422 已提交
2203
    if (expr == NULL) {
2204
      expr = exprTreeFromBinary(pTagCond, len);
weixin_48148422's avatar
weixin_48148422 已提交
2205
    } else {
2206 2207 2208 2209 2210 2211 2212
      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL);
      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
      if (tagExpr != NULL) {
        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL);
        tExprNode* tbnameExpr = expr;
        expr = calloc(1, sizeof(tExprNode));
        if (expr == NULL) {
2213
          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
2214 2215 2216 2217 2218 2219
        }
        expr->nodeType = TSQL_NODE_EXPR;
        expr->_node.optr = tagNameRelType;
        expr->_node.pLeft = tagExpr;
        expr->_node.pRight = tbnameExpr;
      }
weixin_48148422's avatar
weixin_48148422 已提交
2220
    }
2221 2222 2223 2224 2225 2226 2227
    CLEANUP_EXECUTE();

  } CATCH( code ) {
    CLEANUP_EXECUTE();
    ret = code;
    // TODO: more error handling
  } END_TRY
2228

H
hjxilinx 已提交
2229
  doQueryTableList(pTable, res, expr);
2230
  pGroupInfo->numOfTables = taosArrayGetSize(res);
2231
  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
2232

S
Shengliang Guan 已提交
2233
  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
H
Haojun Liao 已提交
2234 2235
      pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));

2236
  taosArrayDestroy(res);
2237 2238

  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
2239
  return ret;
2240 2241 2242

  _error:
  return terrno;
2243
}
2244

H
TD-353  
Hongze Cheng 已提交
2245
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
2246 2247
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

2248 2249
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
2250
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
H
Hongze Cheng 已提交
2251
    tsdbUnlockRepoMeta(tsdb);
2252
    goto _error;
2253
  }
2254

B
Bomin Zhang 已提交
2255
  assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
2256 2257 2258
  tsdbRefTable(pTable);
  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;

2259 2260
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
2261
  
2262
  SArray* group = taosArrayInit(1, POINTER_BYTES);
2263
  
2264
  taosArrayPush(group, &pTable);
2265
  taosArrayPush(pGroupInfo->pGroupList, &group);
2266 2267
  
  return TSDB_CODE_SUCCESS;
2268 2269 2270

  _error:
  return terrno;
2271
}
2272

2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
  SArray* group = taosArrayInit(1, POINTER_BYTES);

  int32_t i = 0;
  for(; i < size; ++i) {
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
    }

    tsdbRefTable(pTable);
    taosArrayPush(group, &pTable);
  }

  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;

  pGroupInfo->numOfTables = i;
  taosArrayPush(pGroupInfo->pGroupList, &group);

  return TSDB_CODE_SUCCESS;

  _error:
  return terrno;
}

H
hzcheng 已提交
2311
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
2312
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
2313 2314 2315 2316
  if (pQueryHandle == NULL) {
    return;
  }
  
2317
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
2318
  for (int32_t i = 0; i < size; ++i) {
2319
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
2320
    tSkipListDestroyIter(pTableCheckInfo->iter);
H
Haojun Liao 已提交
2321 2322 2323 2324

    tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
    tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);

H
hjxilinx 已提交
2325 2326 2327
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
2328

2329 2330 2331
    tfree(pTableCheckInfo->pDataCols);
    tfree(pTableCheckInfo->pCompInfo);
  }
2332

2333
  taosArrayDestroy(pQueryHandle->pTableCheckInfo);
2334

H
hjxilinx 已提交
2335 2336 2337
   size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
   for (int32_t i = 0; i < cols; ++i) {
     SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
2338
     tfree(pColInfo->pData);
H
hjxilinx 已提交
2339
   }
2340

2341
  taosArrayDestroy(pQueryHandle->pColumns);
2342
  tfree(pQueryHandle->pDataBlockInfo);
H
Haojun Liao 已提交
2343 2344
  tfree(pQueryHandle->statis);
  
H
TD-100  
hzcheng 已提交
2345
  tsdbDestroyHelper(&pQueryHandle->rhelper);
2346
  
2347 2348
  tfree(pQueryHandle);
}
2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371

void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) {
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
      assert(pTable != NULL);

      tsdbUnRefTable(pTable);
    }

    taosArrayDestroy(p);
  }

  taosArrayDestroy(pGroupList->pGroupList);
}