tsdbRead.c 48.8 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
17

18
#include "talgo.h"
19
#include "tlog.h"
20
#include "tutil.h"
21
#include "tcompare.h"
22

23 24
#include "../../../query/inc/qast.h"  // todo move to common module
#include "../../../query/inc/tlosertree.h"  // todo move to util module
25
#include "tsdb.h"
H
TD-34  
hzcheng 已提交
26
#include "tsdbMain.h"
27

28
#define EXTRA_BYTES 2
H
hjxilinx 已提交
29
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
30
#define ASCENDING_ORDER_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
31
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
32

33 34 35 36 37
enum {
  QUERY_RANGE_LESS_EQUAL = 0,
  QUERY_RANGE_GREATER_EQUAL = 1,
};

38 39 40
typedef struct SField {
  // todo need the definition
} SField;
H
hjxilinx 已提交
41

42 43
typedef struct SQueryFilePos {
  int32_t fid;
44 45
  int32_t slot;
  int32_t pos;
46 47
  int64_t lastKey;
} SQueryFilePos;
H
hjxilinx 已提交
48

49
typedef struct SDataBlockLoadInfo {
H
hjxilinx 已提交
50
  SFileGroup* fileGroup;
51 52 53
  int32_t     slot;
  int32_t     sid;
  SArray*     pLoadedCols;
54
} SDataBlockLoadInfo;
H
hjxilinx 已提交
55

56 57 58 59 60
typedef struct SLoadCompBlockInfo {
  int32_t sid; /* meter sid */
  int32_t fileId;
  int32_t fileListIndex;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
61

62
typedef struct STableCheckInfo {
63 64 65 66 67 68
  STableId   tableId;
  TSKEY      lastKey;
  STable*    pTableObj;
  int64_t    offsetInHeaderFile;
  int32_t    start;
  bool       checkFirstFileBlock;
H
hjxilinx 已提交
69
  
70
  SCompInfo* pCompInfo;
H
hjxilinx 已提交
71 72
  int32_t    compSize;
  
73 74 75
  int32_t    numOfBlocks;  // number of qualified data blocks not the original blocks

  SDataCols*         pDataCols;
76 77
  SSkipListIterator* iter;
} STableCheckInfo;
78 79

typedef struct {
80 81
  SCompBlock* compBlock;
  SField*     fields;
82 83
} SCompBlockFields;

84
typedef struct STableBlockInfo {
85
  SCompBlockFields pBlock;
86
  STableCheckInfo* pTableCheckInfo;
87
  int32_t          blockIndex;
88 89
  int32_t          groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo;
90

91 92 93 94 95 96 97
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*            blockIndexArray;
  int32_t*            numOfBlocksPerMeter;
} SBlockOrderSupporter;

98
typedef struct STsdbQueryHandle {
99
  STsdbRepo*    pTsdb;
100
  SQueryFilePos cur;    // current position
101 102 103 104 105 106

  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */

  int16_t     order;
  STimeWindow window;  // the primary query time window that applies to all queries
H
hjxilinx 已提交
107
  SCompBlock* pBlock;
108
  int32_t     numOfBlocks;
109
  SField**    pFields;
H
hjxilinx 已提交
110
  SArray*     pColumns;  // column list, SColumnInfoData array list
111 112
  bool        locateStart;
  int32_t     realNumOfRows;
113
  SArray*     pTableCheckInfo;
H
hjxilinx 已提交
114
  int32_t     activeIndex;
115 116 117
  bool        checkFiles;  // check file stage
  void*       qinfo;  // query info handle, for debug purpose
  
118 119 120 121 122
  STableBlockInfo* pDataBlockInfo;

  SFileGroup*    pFileGroup;
  SFileGroupIter fileIter;
  SCompIdx*      compIndex;
H
TD-100  
hzcheng 已提交
123
  SRWHelper rhelper;
124 125
} STsdbQueryHandle;

126
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
127
  pBlockLoadInfo->slot = -1;
128
  pBlockLoadInfo->sid = -1;
H
hjxilinx 已提交
129
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
130 131
}

132
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
133 134 135 136
  pCompBlockLoadInfo->sid = -1;
  pCompBlockLoadInfo->fileId = -1;
  pCompBlockLoadInfo->fileListIndex = -1;
}
H
hjxilinx 已提交
137

138
tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) {
139 140 141
  // todo 1. filter not exist table
  // todo 2. add the reference count for each table that is involved in query

142
  STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
143
  pQueryHandle->order  = pCond->order;
144
  pQueryHandle->window = pCond->twindow;
145
  pQueryHandle->pTsdb  = tsdb;
H
TD-100  
hzcheng 已提交
146 147
  pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx));
  tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
148

149
  pQueryHandle->cur.fid = -1;
150

151 152
  size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
  assert(sizeOfGroup >= 1);
H
hjxilinx 已提交
153

154 155 156 157
  pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
  
  for (int32_t i = 0; i < sizeOfGroup; ++i) {
    SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
158

159
    size_t gsize = taosArrayGetSize(group);
160 161
    assert(gsize > 0);
    
162
    for (int32_t j = 0; j < gsize; ++j) {
163 164
      SPair* d = (SPair*) taosArrayGet(group, j);
      assert(d->first !=  NULL);
165 166 167
      
      STableCheckInfo info = {
          .lastKey = pQueryHandle->window.skey,
168 169
          .tableId = ((STable*) d->first)->tableId,
          .pTableObj = d->first,
170 171 172 173
      };

      taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
    }
H
hjxilinx 已提交
174
  }
175

176 177 178 179 180 181
  dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
  
  /*
   * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
   * in case of descending timestamp order query.
   */
182
  pQueryHandle->checkFiles  = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order);
H
hjxilinx 已提交
183
  pQueryHandle->activeIndex = 0;
184

185
  // allocate buffer in order to load data blocks from file
186 187
  int32_t numOfCols = taosArrayGetSize(pColumnInfo);
  size_t  bufferCapacity = 4096;
188

H
hjxilinx 已提交
189
  pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
190
  for (int32_t i = 0; i < numOfCols; ++i) {
H
hjxilinx 已提交
191 192
    SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i);
    SColumnInfoData  pDest = {{0}, 0};
193 194 195 196 197 198

    pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes);
    pDest.info = pCol->info;
    taosArrayPush(pQueryHandle->pColumns, &pDest);
  }

199 200
  tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
201 202

  return (tsdb_query_handle_t)pQueryHandle;
H
hjxilinx 已提交
203 204
}

205
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
206 207
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
208 209 210
  pHandle->cur.fid = -1;
  
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
211

212
  STable* pTable = pCheckInfo->pTableObj;
213
  assert(pTable != NULL);
214

215 216
  // no data in cache, abort
  if (pTable->mem == NULL && pTable->imem == NULL) {
217 218
    return false;
  }
219 220
  
  if (pCheckInfo->iter == NULL) {
221 222 223
    pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
        TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
    
224 225 226 227 228 229 230 231 232 233 234 235 236
    if (pCheckInfo->iter == NULL) {
      return false;
    }
  }
  
  if (!tSkipListIterNext(pCheckInfo->iter)) {  // buffer is empty
    return false;
  }

  SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
  if (node == NULL) {
    return false;
  }
237

238 239
  SDataRow row = SL_GET_NODE_DATA(node);
  pCheckInfo->lastKey = dataRowKey(row);  // first timestamp in buffer
240 241
  dTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d", pHandle,
      pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order);
242
  
243
  // all data in mem are checked already.
244 245
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pHandle->order))) {
246 247
    return false;
  }
248

249
  return true;
250
}
H
hjxilinx 已提交
251

252 253
// todo dynamic get the daysperfile
static int32_t getFileIdFromKey(TSKEY key) {
254
  int64_t fid = (int64_t)(key / (10 * tsMsPerDay[0]));  // set the starting fileId
255 256 257 258 259
  if (fid > INT32_MAX) {
    fid = INT32_MAX;
  }
  
  return fid;
260 261
}

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
  
  int32_t midSlot = firstSlot;
  
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
    
    if (numOfBlocks == 1) break;
    
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
  
  return midSlot;
}
288 289 290 291

static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
  // todo check open file failed
  SFileGroup* fileGroup = pQueryHandle->pFileGroup;
292 293
  
  assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
H
TD-100  
hzcheng 已提交
294
  tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
295 296

  // load all the comp offset value for all tables in this file
H
TD-100  
hzcheng 已提交
297
  // tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000);  // todo set dynamic max tables
298 299 300 301 302 303 304

  *numOfBlocks = 0;
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);

  for (int32_t i = 0; i < numOfTables; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);

H
TD-100  
hzcheng 已提交
305
    SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
306
    if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) {  // no data block in this file, try next file
307
      continue;//no data blocks in the file belongs to pCheckInfo->pTable
308
    } else {
H
hjxilinx 已提交
309 310 311 312 313 314 315
      if (pCheckInfo->compSize < compIndex->len) {
        assert(compIndex->len > 0);
        
        char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
        assert(t != NULL);
        
        pCheckInfo->pCompInfo = (SCompInfo*) t;
316
        pCheckInfo->compSize = compIndex->len;
H
hjxilinx 已提交
317 318
      }
      
H
TD-100  
hzcheng 已提交
319
      // tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
H
TD-100  
hzcheng 已提交
320 321 322 323 324 325
      STable* pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->tableId.uid);
      assert(pTable != NULL);
      
      tsdbSetHelperTable(&pQueryHandle->rhelper, pTable, pQueryHandle->pTsdb);

      tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
326 327 328 329 330
      SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
      
      TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
      
331
      // discard the unqualified data block based on the query time window
332 333 334 335
      int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC);
      int32_t end = start;
      
      if (s > pCompInfo->blocks[start].keyLast) {
336 337 338
        continue;
      }

339 340 341
      // todo speedup the procedure of located end block
      while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
        end += 1;
342 343
      }

344 345
      pCheckInfo->numOfBlocks = (end - start);
      
346
      if (start > 0) {
347
        memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
348 349 350 351
      }

      (*numOfBlocks) += pCheckInfo->numOfBlocks;
    }
352
  }
353

354 355 356
  return TSDB_CODE_SUCCESS;
}

357
static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
H
[td-32]  
hjxilinx 已提交
358
  SDataBlockInfo info = {
359 360 361 362 363
      .window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast},
      .numOfCols = pBlock->numOfCols,
      .rows = pBlock->numOfPoints,
      .sid = pCheckInfo->tableId.tid,
      .uid = pCheckInfo->tableId.uid,
H
[td-32]  
hjxilinx 已提交
364
  };
365

366 367 368
  return info;
}

369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
  assert(numOfCols <= TSDB_MAX_COLUMNS);
  
  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
    taosArrayPush(pIdList, &pCol->info.colId);
  }
  
  return pIdList;
}

static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pQueryHandle);
  
  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
  
  // the primary timestamp column does not be included in the the specified load column list, add it
  if (loadTS && colId != 0) {
    int16_t columnId = 0;
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }
  
  return pLocalIdList;
}

397 398 399 400 401 402
static void    filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
                                     SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);

static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
  SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
H
[td-32]  
hjxilinx 已提交
403

H
hjxilinx 已提交
404 405
  data->numOfCols = pBlock->numOfCols;
  data->uid = pCheckInfo->pTableObj->tableId.uid;
406 407 408 409 410 411 412 413 414 415

  bool    blockLoaded = false;
  SArray* sa = getDefaultLoadColumns(pQueryHandle, true);

  if (pCheckInfo->pDataCols == NULL) {
    pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
  }

  tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));

H
TD-100  
hzcheng 已提交
416 417 418 419
  // SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
  // if (pFile->fd == FD_INITIALIZER) {
  //   pFile->fd = open(pFile->fname, O_RDONLY);
  // }
420

H
TD-100  
hzcheng 已提交
421 422
  if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
    SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
423

H
TD-100  
hzcheng 已提交
424 425 426
    pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
    pBlockLoadInfo->slot = pQueryHandle->cur.slot;
    pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
427

H
TD-100  
hzcheng 已提交
428 429
    blockLoaded = true;
  }
430 431 432 433

  taosArrayDestroy(sa);
  tfree(data);

H
TD-100  
hzcheng 已提交
434 435
  // TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
  // assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
436 437

  return blockLoaded;
H
hjxilinx 已提交
438 439
}

440 441 442 443
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
  SArray*        sa = getDefaultLoadColumns(pQueryHandle, true);
  SQueryFilePos* cur = &pQueryHandle->cur;

444
  if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
H
hjxilinx 已提交
445
    // query ended in current block
446
    if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
447 448 449 450 451 452 453 454 455 456 457 458 459 460
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }

      SDataCols* pDataCols = pCheckInfo->pDataCols;
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
            binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
      } else {
        cur->pos = 0;
      }

      filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
    } else {  // the whole block is loaded in to buffer
H
hjxilinx 已提交
461
      pQueryHandle->realNumOfRows = pBlock->numOfPoints;
H
[td-32]  
hjxilinx 已提交
462
    }
463 464
  } else {
    // query ended in current block
H
[td-32]  
hjxilinx 已提交
465
    if (pQueryHandle->window.ekey > pBlock->keyFirst) {
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
      if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
        return false;
      }
      
      SDataCols* pDataCols = pCheckInfo->pDataCols;
      if (pCheckInfo->lastKey < pBlock->keyLast) {
        cur->pos =
            binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order);
      } else {
        cur->pos = pBlock->numOfPoints - 1;
      }
      
      filterDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
    } else {
      pQueryHandle->realNumOfRows = pBlock->numOfPoints;
H
[td-32]  
hjxilinx 已提交
481 482
    }
  }
483

484
  taosArrayDestroy(sa);
H
[td-32]  
hjxilinx 已提交
485 486 487
  return pQueryHandle->realNumOfRows > 0;
}

488
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
489 490
  int    firstPos, lastPos, midPos = -1;
  int    numOfPoints;
491 492
  TSKEY* keyList;

493 494
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
  
495
  if (num <= 0) return -1;
496 497

  keyList = (TSKEY*)pValue;
498 499
  firstPos = 0;
  lastPos = num - 1;
500

501
  if (order == TSDB_ORDER_DESC) {
502 503 504 505 506
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
507

508 509
      numOfPoints = lastPos - firstPos + 1;
      midPos = (numOfPoints >> 1) + firstPos;
510

511 512 513 514 515 516 517 518
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
519

520 521 522 523 524
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
525

526 527 528 529 530 531 532
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
533

534 535
      numOfPoints = lastPos - firstPos + 1;
      midPos = (numOfPoints >> 1) + firstPos;
536

537 538 539 540 541 542 543 544 545
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
546

547 548 549
  return midPos;
}

H
[td-32]  
hjxilinx 已提交
550 551
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
552 553 554 555 556 557 558
static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
                                  SArray* sa) {
  SQueryFilePos* cur = &pQueryHandle->cur;
  SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock);

  SDataCols* pCols = pCheckInfo->pDataCols;

559
  int32_t endPos = cur->pos;
560
  if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
561
    endPos = blockInfo.rows - 1;
562
    pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
563
    pCheckInfo->lastKey = blockInfo.window.ekey + 1;
564
  } else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
565 566
    endPos = 0;
    pQueryHandle->realNumOfRows = cur->pos + 1;
567
    pCheckInfo->lastKey = blockInfo.window.ekey - 1;
568
  } else {
569 570
    int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
    endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
571

572
    if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
573 574 575 576
      if (endPos < cur->pos) {
        pQueryHandle->realNumOfRows = 0;
        return;
      } else {
577
        pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
578
      }
579

580
      pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
581 582 583 584 585
    } else {
      if (endPos > cur->pos) {
        pQueryHandle->realNumOfRows = 0;
        return;
      } else {
586
        pQueryHandle->realNumOfRows = cur->pos - endPos + 1;
587 588 589
      }
    }
  }
590

H
TD-100  
hzcheng 已提交
591
  // int32_t start = MIN(cur->pos, endPos);
592

593
  // move the data block in the front to data block if needed
H
[td-32]  
hjxilinx 已提交
594
  int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
595

H
hjxilinx 已提交
596
  for (int32_t i = 0; i < taosArrayGetSize(sa); ++i) {
597 598
    int16_t colId = *(int16_t*)taosArrayGet(sa, i);

H
[td-32]  
hjxilinx 已提交
599
    for (int32_t j = 0; j < numOfCols; ++j) {
H
hjxilinx 已提交
600
      SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
601

H
[td-32]  
hjxilinx 已提交
602
      if (pCol->info.colId == colId) {
H
TD-100  
hzcheng 已提交
603 604 605 606
        // SDataCol* pDataCol = &pCols->cols[i];
        pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData;
        // memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start,
        //         pQueryHandle->realNumOfRows * pCol->info.bytes);
H
[td-32]  
hjxilinx 已提交
607
        break;
608 609 610
      }
    }
  }
611 612 613

  assert(pQueryHandle->realNumOfRows <= blockInfo.rows);

614 615 616 617
  // forward(backward) the position for cursor
  cur->pos = endPos;
}

618
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
619 620
  int    firstPos, lastPos, midPos = -1;
  int    numOfPoints;
621 622
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
623
  if (num <= 0) return -1;
624 625

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
626 627
  firstPos = 0;
  lastPos = num - 1;
628

H
[td-32]  
hjxilinx 已提交
629 630 631 632 633 634
  if (order == 0) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
635

H
[td-32]  
hjxilinx 已提交
636 637
      numOfPoints = lastPos - firstPos + 1;
      midPos = (numOfPoints >> 1) + firstPos;
638

H
[td-32]  
hjxilinx 已提交
639 640 641 642 643 644 645 646
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
647

H
[td-32]  
hjxilinx 已提交
648 649 650 651 652
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
653

H
[td-32]  
hjxilinx 已提交
654 655 656 657 658 659 660
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
661

H
[td-32]  
hjxilinx 已提交
662 663
      numOfPoints = lastPos - firstPos + 1;
      midPos = (numOfPoints >> 1) + firstPos;
664

H
[td-32]  
hjxilinx 已提交
665 666 667 668 669 670 671 672 673
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
674

H
[td-32]  
hjxilinx 已提交
675 676 677
  return midPos;
}

678
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
  tfree(pSupporter->numOfBlocksPerMeter);
  tfree(pSupporter->blockIndexArray);

  for (int32_t i = 0; i < numOfTables; ++i) {
    tfree(pSupporter->pDataBlockInfo[i]);
  }

  tfree(pSupporter->pDataBlockInfo);
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

  if (leftTableBlockIndex > pSupporter->numOfBlocksPerMeter[leftTableIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerMeter[rightTableIndex]) {
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

  //    assert(pLeftBlockInfoEx->pBlock.compBlock->offset != pRightBlockInfoEx->pBlock.compBlock->offset);
  if (pLeftBlockInfoEx->pBlock.compBlock->offset == pRightBlockInfoEx->pBlock.compBlock->offset &&
      pLeftBlockInfoEx->pBlock.compBlock->last == pRightBlockInfoEx->pBlock.compBlock->last) {
    // todo add more information
    dError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->pBlock.compBlock->offset);
  }

  return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1;
}

719
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742
  char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks);
  if (tmp == NULL) {
    return TSDB_CODE_SERV_OUT_OF_MEMORY;
  }

  pQueryHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
  memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks);
  *numOfAllocBlocks = numOfBlocks;

  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);

  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
  sup.numOfBlocksPerMeter = calloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);

  if (sup.numOfBlocksPerMeter == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
    cleanBlockOrderSupporter(&sup, 0);
    return TSDB_CODE_SERV_OUT_OF_MEMORY;
  }

  int32_t cnt = 0;
743
  int32_t numOfQualTables = 0;
744 745
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
746 747 748 749
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
    
750
    SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
751
    sup.numOfBlocksPerMeter[numOfQualTables] = pTableCheck->numOfBlocks;
752 753 754

    char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
    if (buf == NULL) {
755
      cleanBlockOrderSupporter(&sup, numOfQualTables);
756 757 758
      return TSDB_CODE_SERV_OUT_OF_MEMORY;
    }

759
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
760 761

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
762
      STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualTables][k];
763 764 765 766 767 768 769 770 771 772

      pBlockInfoEx->pBlock.compBlock = &pBlock[k];
      pBlockInfoEx->pBlock.fields = NULL;

      pBlockInfoEx->pTableCheckInfo = pTableCheck;
      //      pBlockInfoEx->groupIdx = pTableCheckInfo[j]->groupIdx;     // set the group index
      //      pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k;    // set the block index in original meter
      cnt++;
    }

773
    numOfQualTables++;
774 775
  }

776
  dTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
777

778 779
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pMeterDataInfo[j]->numOfBlocks may be 0
  sup.numOfTables = numOfQualTables;
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
  SLoserTreeInfo* pTree = NULL;

  uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
    return TSDB_CODE_SERV_OUT_OF_MEMORY;
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
    int32_t pos = pTree->pNode[0].index;
    int32_t index = sup.blockIndexArray[pos]++;

    STableBlockInfo* pBlocksInfoEx = sup.pDataBlockInfo[pos];
    pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfoEx[index];

    // set data block index overflow, in order to disable the offset comparator
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerMeter[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerMeter[pos] + 1;
800
    }
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818

    tLoserTreeAdjust(pTree, pos + sup.numOfTables);
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
   *   assert((*pDataBlockInfo)[i].pBlock.compBlock->offset < (*pDataBlockInfo)[i+1].pBlock.compBlock->offset);
   * }
   */

  dTrace("%p %d data blocks sort completed", pQueryHandle, cnt);
  cleanBlockOrderSupporter(&sup, numOfTables);
  free(pTree);

  return TSDB_CODE_SUCCESS;
}

819 820 821 822 823 824 825 826 827
// todo opt for only one table case
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
  pQueryHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pQueryHandle->cur;
  
  int32_t numOfBlocks = 0;
  int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  
  while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
828
    int32_t type = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
    if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
      break;
    }
    
    assert(numOfBlocks >= 0);
    dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
           numOfTables, pQueryHandle->pFileGroup->fileId);
    
    // todo return error code to query engine
    if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
      break;
    }
    
    assert(numOfBlocks >= pQueryHandle->numOfBlocks);
    if (pQueryHandle->numOfBlocks > 0) {
      break;
    }
  }
  
  // no data in file anymore
  if (pQueryHandle->numOfBlocks <= 0) {
    assert(pQueryHandle->pFileGroup == NULL);
    cur->fid = -1;
    
    return false;
  }
  
856
  cur->slot = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
857 858 859 860 861 862 863 864 865
  cur->fid = pQueryHandle->pFileGroup->fileId;
  
  STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
  STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
  SCompBlock*      pBlock = pBlockInfo->pBlock.compBlock;
  
  return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
}

866 867 868 869 870 871 872 873 874
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
  STsdbFileH*    pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
  SQueryFilePos* cur = &pQueryHandle->cur;

  // find the start data block in file
  if (!pQueryHandle->locateStart) {
    pQueryHandle->locateStart = true;

    int32_t fid = getFileIdFromKey(pQueryHandle->window.skey);
875 876
    
    tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
877 878
    tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);

879
    return getDataBlocksInFilesImpl(pQueryHandle);
880
  } else {
881 882
    if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all blocks
883
      
884
      return getDataBlocksInFilesImpl(pQueryHandle);
885
    } else {  // next block of the same file
886
      int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1;
887 888
      cur->slot += step;
      
889
      STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
890
      if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
891 892 893 894 895
        cur->pos = 0;
      } else {
        cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
      }

896 897 898
      return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo);
    }
  }
899 900
}

901 902 903
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  // todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
904
  
905 906
  while (pQueryHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pQueryHandle)) {
907 908 909
      return true;
    }
    
910 911 912 913 914 915 916 917 918 919 920 921 922
    pQueryHandle->activeIndex += 1;
  }
  
  return false;
}

// handle data in cache situation
bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) {
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
  
  size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
  assert(numOfTables > 0);
  
923
  if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
924 925
    if (pQueryHandle->checkFiles) {
      if (getDataBlocksInFiles(pQueryHandle)) {
926 927
        return true;
      }
928 929 930

      pQueryHandle->activeIndex = 0;
      pQueryHandle->checkFiles  = false;
931 932
    }
    
933 934 935 936
    return doHasDataInBuffer(pQueryHandle);
  } else {  // starts from the buffer in case of descending timestamp order check data blocks
    if (!pQueryHandle->checkFiles) {
      if (doHasDataInBuffer(pQueryHandle)) {
937 938 939
        return true;
      }
      
940
      pQueryHandle->checkFiles = true;
941
    }
942 943

    return getDataBlocksInFiles(pQueryHandle);
944
  }
945
  
946 947
}

948
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
949
                                 STsdbQueryHandle* pQueryHandle) {
950
  int     numOfRows = 0;
951
  int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
952
  *skey = INT64_MIN;
953

954
  do {
955
    SSkipListNode* node = tSkipListIterGet(pIter);
956 957 958
    if (node == NULL) {
      break;
    }
959

960
    SDataRow row = SL_GET_NODE_DATA(node);
961 962 963 964 965 966 967 968 969 970
    TSKEY key = dataRowKey(row);
    
    if ((key > maxKey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
        (key < maxKey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
      
      dTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
          pQueryHandle->window.ekey);
      
      break;
    }
971

972 973 974
    if (*skey == INT64_MIN) {
      *skey = dataRowKey(row);
    }
975

976
    *ekey = dataRowKey(row);
977

978
    int32_t offset = 0;
979 980
    char* pData = NULL;
    
981
    for (int32_t i = 0; i < numOfCols; ++i) {
982
      SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
983 984 985 986 987 988 989 990
      
      if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
        pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
      } else {
        pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
      }
      
      memcpy(pData, dataRowTuple(row) + offset, pColInfo->info.bytes);
991 992
      offset += pColInfo->info.bytes;
    }
993

994
    numOfRows++;
995 996 997 998 999
    if (numOfRows >= maxRowsToRead) {
      break;
    }
    
  } while(tSkipListIterNext(pIter));
1000

1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
  assert(numOfRows <= maxRowsToRead);
  
  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
  if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
    int32_t emptySize = maxRowsToRead - numOfRows;
    
    for(int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
      memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }
  
1013
  return numOfRows;
H
hjxilinx 已提交
1014 1015
}

1016
// copy data from cache into data block
1017 1018 1019 1020 1021 1022
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;

  STable* pTable = NULL;

  TSKEY   skey = 0, ekey = 0;
1023
  int32_t rows = 0;
1024

1025 1026
  int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1;
  
H
[td-32]  
hjxilinx 已提交
1027
  // data in file
1028 1029 1030 1031 1032 1033 1034 1035
  if (pHandle->cur.fid >= 0) {
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];

    pTable = pBlockInfo->pTableCheckInfo->pTableObj;

    SDataBlockInfo binfo = getTrueDataBlockInfo(pBlockInfo->pTableCheckInfo, pBlockInfo->pBlock.compBlock);
    if (binfo.rows == pHandle->realNumOfRows) {
      pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + 1;
H
[td-32]  
hjxilinx 已提交
1036 1037 1038 1039 1040
      return binfo;
    } else {
      /* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the
       * block next function
       */
H
hjxilinx 已提交
1041
      SColumnInfoData* pColInfoEx = taosArrayGet(pHandle->pColumns, 0);
1042

H
[td-32]  
hjxilinx 已提交
1043
      rows = pHandle->realNumOfRows;
1044 1045 1046 1047
      skey = *(TSKEY*)pColInfoEx->pData;
      ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));

      // update the last key value
1048
      pBlockInfo->pTableCheckInfo->lastKey = ekey + step;
H
[td-32]  
hjxilinx 已提交
1049 1050
    }
  } else {
1051 1052 1053
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
    pTable = pCheckInfo->pTableObj;

H
[td-32]  
hjxilinx 已提交
1054 1055
    if (pTable->mem != NULL) {
      // create mem table iterator if it is not created yet
1056 1057
      assert(pCheckInfo->iter != NULL);
      rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle);
1058 1059

      // update the last key value
1060
      pCheckInfo->lastKey = ekey + step;
H
hjxilinx 已提交
1061
    }
1062
  }
1063

1064
  SDataBlockInfo blockInfo = {
1065 1066 1067 1068 1069
      .uid = pTable->tableId.uid,
      .sid = pTable->tableId.tid,
      .rows = rows,
      .window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)}
  };
1070

1071 1072
  return blockInfo;
}
H
hjxilinx 已提交
1073

1074
// return null for data block in cache
1075
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t* pQueryHandle, SDataStatis** pBlockStatis) {
1076 1077
  *pBlockStatis = NULL;
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
1078 1079
}

1080
SArray* tsdbRetrieveDataBlock(tsdb_query_handle_t* pQueryHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
1081
  /**
H
hjxilinx 已提交
1082
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
1083 1084
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
1085 1086
  STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;

H
[td-32]  
hjxilinx 已提交
1087 1088 1089
  if (pHandle->cur.fid < 0) {
    return pHandle->pColumns;
  } else {
1090 1091 1092 1093 1094 1095 1096
    STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo*   pCheckInfo = pBlockInfoEx->pTableCheckInfo;

    SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock);
    assert(pHandle->realNumOfRows <= binfo.rows);

    if (pHandle->realNumOfRows < binfo.rows) {
H
[td-32]  
hjxilinx 已提交
1097 1098
      return pHandle->pColumns;
    } else {
H
hjxilinx 已提交
1099 1100 1101 1102 1103
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
        return pHandle->pColumns;
      } else {
1104 1105 1106 1107 1108 1109 1110
        SCompBlock* pBlock = pBlockInfoEx->pBlock.compBlock;
        doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);

        SArray* sa = getDefaultLoadColumns(pHandle, true);
        filterDataInDataBlock(pHandle, pCheckInfo, pBlock, sa);
        taosArrayDestroy(sa);

H
hjxilinx 已提交
1111 1112
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
1113 1114
    }
  }
H
hjxilinx 已提交
1115 1116
}

1117
int32_t tsdbResetQuery(tsdb_query_handle_t* pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
1118 1119
  return 0;
}
1120

1121
int32_t tsdbDataBlockSeek(tsdb_query_handle_t* pQueryHandle, tsdbpos_t pos) { return 0; }
1122

1123
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t* pQueryHandle) { return NULL; }
1124

1125
SArray* tsdbRetrieveDataRow(tsdb_query_handle_t* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
1126

1127
tsdb_query_handle_t* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) {
1128 1129
  return NULL;
}
1130

1131
SArray* tsdbGetTableList(tsdb_query_handle_t* pQueryHandle) { return NULL; }
1132

1133
static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
1134
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
1135 1136
  assert(pTable != NULL);  // assert pTable is a super table

1137
  SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
1138
  while (tSkipListIterNext(iter)) {
H
hjxilinx 已提交
1139
    SSkipListNode* pNode = tSkipListIterGet(iter);
1140 1141
    
    STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
1142
    taosArrayPush(list, &t);
1143
  }
1144
  
1145
  return TSDB_CODE_SUCCESS;
1146 1147
}

1148
typedef struct SExprTreeSupporter {
1149 1150 1151
  SSchema* pTagSchema;
  int32_t  numOfTags;
  int32_t  optr;
1152
} SExprTreeSupporter;
1153 1154

/**
1155
 * convert the result pointer to table id instead of table object pointer
1156 1157
 * @param pRes
 */
1158 1159
static void convertQueryResult(SArray* pRes, SArray* pTableList) {
  if (pTableList == NULL || taosArrayGetSize(pTableList) == 0) {
1160 1161
    return;
  }
1162 1163

  size_t size = taosArrayGetSize(pTableList);
1164
  for (int32_t i = 0; i < size; ++i) {
1165
    STable* pTable = taosArrayGetP(pTableList, i);
1166
    taosArrayPush(pRes, &pTable);
1167 1168 1169
  }
}

1170
static void destroyHelper(void* param) {
1171 1172 1173
  if (param == NULL) {
    return;
  }
1174

1175 1176 1177 1178 1179
  tQueryInfo* pInfo = (tQueryInfo*)param;
  tVariantDestroy(&(pInfo->q));
  free(param);
}

1180
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) {
1181
  *index = 0;
1182

1183 1184 1185 1186 1187
  // filter on table name(TBNAME)
  if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
    *index = TSDB_TBNAME_COLUMN_INDEX;
    return;
  }
1188

1189 1190
  while ((*index) < pSupporter->numOfTags) {
    if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
1191
        pSupporter->pTagSchema[*index].type  == pSchema->type  &&
1192
        pSupporter->pTagSchema[*index].colId == pSchema->colId) {
1193 1194 1195 1196 1197 1198
      break;
    }
  }
}

void filterPrepare(void* expr, void* param) {
1199
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
1200
  if (pExpr->_node.info != NULL) {
1201 1202
    return;
  }
1203

1204
  int32_t i = 0, offset = 0;
H
[td-32]  
hjxilinx 已提交
1205
  pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
1206

H
[td-32]  
hjxilinx 已提交
1207
  tQueryInfo* pInfo = pExpr->_node.info;
1208 1209 1210

  SExprTreeSupporter* pSupporter = (SExprTreeSupporter*)param;

H
[td-32]  
hjxilinx 已提交
1211 1212
  tVariant* pCond = pExpr->_node.pRight->pVal;
  SSchema*  pSchema = pExpr->_node.pLeft->pSchema;
1213

1214
  getTagColumnInfo(pSupporter, pSchema, &i);
1215 1216
  assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
  assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
1217

1218
  pInfo->sch      = *pSchema;
1219
  pInfo->colIndex = i;
1220 1221
  pInfo->optr     = pExpr->_node.optr;
  pInfo->compare  = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
1222

1223 1224 1225 1226
  tVariantAssign(&pInfo->q, pCond);
  tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
}

1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
  switch (type) {
    case TSDB_DATA_TYPE_INT:        DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2));
    case TSDB_DATA_TYPE_DOUBLE:     DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2));
    case TSDB_DATA_TYPE_FLOAT:      DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2));
    case TSDB_DATA_TYPE_BIGINT:     DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2));
    case TSDB_DATA_TYPE_SMALLINT:   DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2));
    case TSDB_DATA_TYPE_TINYINT:
    case TSDB_DATA_TYPE_BOOL:       DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2));
    case TSDB_DATA_TYPE_NCHAR: {
      int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE);
      if (ret == 0) {
        return ret;
      }
      return (ret < 0) ? -1 : 1;
    }
    default: {
      int32_t ret = strncmp(f1, f2, (size_t)size);
      if (ret == 0) {
        return ret;
      }
      
      return (ret < 0) ? -1 : 1;
    }
  }
}

typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
  STSchema*  pTagSchema;
} STableGroupSupporter;

int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
  
  STable *pTable1 = *(STable **) p1;
  STable *pTable2 = *(STable **) p2;
  
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
    
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
    
    if (colIndex == -1) { // table name, todo fix me
//      f1 = s1->tags;
//      f2 = s2->tags;
      type = TSDB_DATA_TYPE_BINARY;
      bytes = TSDB_TABLE_NAME_LEN;
    } else {
      f1 = dataRowTuple(pTable1->tagVal);
      f2 = dataRowTuple(pTable2->tagVal);

      type = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->type;
      bytes = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->bytes;
    }
    
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  
  return 0;
}

void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
1300 1301 1302 1303
  SArray* g = taosArrayInit(16, sizeof(SPair));
  
  SPair p = {.first = pTables[0]};
  taosArrayPush(g, &p);
1304 1305 1306 1307 1308 1309
  
  for (int32_t i = 1; i < numOfTables; ++i) {
    int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp);
    assert(ret == 0 || ret == -1);
    
    if (ret == 0) {
1310 1311
      SPair p1 = {.first = pTables[i]};
      taosArrayPush(g, &p1);
1312 1313 1314
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
      g = taosArrayInit(16, POINTER_BYTES);
1315 1316 1317
  
      SPair p1 = {.first = pTables[i]};
      taosArrayPush(g, &p1);
1318 1319
    }
  }
1320 1321
  
  taosArrayPush(pGroups, &g);
1322 1323 1324
}

SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
1325 1326
  assert(pTableList != NULL);
  
1327 1328 1329 1330 1331 1332 1333 1334 1335
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
  
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
    pTrace("no qualified tables");
    return pTableGroup;
  }
  
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346
    size_t num = taosArrayGetSize(pTableList);
    
    SArray* sa = taosArrayInit(num, sizeof(SPair));
    for(int32_t i = 0; i < num; ++i) {
      STable* pTable = taosArrayGetP(pTableList, i);
      
      SPair p = {.first = pTable};
      taosArrayPush(sa, &p);
    }
    
    taosArrayPush(pTableGroup, &sa);
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
    pTrace("all %d tables belong to one group", size);
    
#ifdef _DEBUG_VIEW
    tSidSetDisplay(pTableGroup);
#endif
  } else {
    STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
    pSupp->numOfCols = numOfOrderCols;
    pSupp->pTagSchema = pTagSchema;
    pSupp->pCols = pCols;
    
1358
    taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
    createTableGroupImpl(pTableGroup, pTableList->pData, size, pSupp, tableGroupComparFn);

#ifdef _DEBUG_VIEW
    tSidSetDisplay(pTableGroup);
#endif
    tfree(pSupp);
  }
  
  return pTableGroup;
}

1370 1371
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
  tQueryInfo* pInfo = (tQueryInfo*)param;
1372

1373
  STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
1374 1375

  char*  val = dataRowTuple(pTable->tagVal);  // todo not only the first column
1376
  int8_t type = pInfo->sch.type;
1377

1378 1379 1380 1381 1382
  int32_t ret = 0;
  if (pInfo->q.nType == TSDB_DATA_TYPE_BINARY || pInfo->q.nType == TSDB_DATA_TYPE_NCHAR) {
    ret = pInfo->compare(val, pInfo->q.pz);
  } else {
    tVariant t = {0};
1383 1384
    tVariantCreateFromBinary(&t, val, (uint32_t)pInfo->sch.bytes, type);

1385 1386
    ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key);
  }
1387

1388 1389 1390 1391 1392 1393 1394
  switch (pInfo->optr) {
    case TSDB_RELATION_EQUAL: {
      return ret == 0;
    }
    case TSDB_RELATION_NOT_EQUAL: {
      return ret != 0;
    }
1395
    case TSDB_RELATION_GREATER_EQUAL: {
1396 1397
      return ret >= 0;
    }
1398
    case TSDB_RELATION_GREATER: {
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
      return ret > 0;
    }
    case TSDB_RELATION_LESS_EQUAL: {
      return ret <= 0;
    }
    case TSDB_RELATION_LESS: {
      return ret < 0;
    }
    case TSDB_RELATION_LIKE: {
      return ret == 0;
    }
1410

1411 1412 1413 1414 1415 1416
    default:
      assert(false);
  }
  return true;
}

1417
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
1418
  // query according to the binary expression
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
  STSchema* pSchema = pSTable->tagSchema;
  SSchema*  schema = calloc(schemaNCols(pSchema), sizeof(SSchema));
  for (int32_t i = 0; i < schemaNCols(pSchema); ++i) {
    schema[i].colId = schemaColAt(pSchema, i)->colId;
    schema[i].type = schemaColAt(pSchema, i)->type;
    schema[i].bytes = schemaColAt(pSchema, i)->bytes;
  }

  SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)};

  SBinaryFilterSupp supp = {
1430 1431
      .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s,
      };
1432 1433 1434 1435 1436 1437 1438

  SArray* pTableList = taosArrayInit(8, POINTER_BYTES);

  tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp);
  tExprTreeDestroy(&pExpr, destroyHelper);

  convertQueryResult(pRes, pTableList);
1439 1440 1441
  return TSDB_CODE_SUCCESS;
}

1442
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo,
1443 1444
    SColIndex* pColIndex, int32_t numOfCols) {
  
1445
  STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
  if (pSTable == NULL) {
    dError("failed to get stable, uid:%" PRIu64, uid);
    return TSDB_CODE_INVALID_TABLE_ID;
  }
  
  SArray* res = taosArrayInit(8, POINTER_BYTES);
  STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable);
  
  if (pTagCond == NULL || len == 0) {  // no tags condition, all tables created according to this stable are involved
    int32_t ret = getAllTableIdList(tsdb, uid, res);
    if (ret != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(res);
      return ret;
    }
1460 1461 1462
  
    pGroupInfo->numOfTables = taosArrayGetSize(res);
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
1463 1464
    return ret;
  }
1465 1466 1467 1468 1469 1470 1471

  tExprNode* pExprNode = NULL;
  int32_t    ret = TSDB_CODE_SUCCESS;

  // failed to build expression, no result, return immediately
  if ((ret = exprTreeFromBinary(pTagCond, len, &pExprNode) != TSDB_CODE_SUCCESS) || (pExprNode == NULL)) {
    dError("stable:%" PRIu64 ", failed to deserialize expression tree, error exists", uid);
1472
    taosArrayDestroy(res);
1473 1474 1475
    return ret;
  }

1476
  doQueryTableList(pSTable, res, pExprNode);
1477 1478 1479
  
  pGroupInfo->numOfTables = taosArrayGetSize(res);
  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
1480 1481

  return ret;
1482
}
1483

1484
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo) {
1485 1486 1487 1488 1489 1490
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
    return TSDB_CODE_INVALID_TABLE_ID;
  }
  
  //todo assert table type, add the table ref count
1491 1492
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
1493 1494 1495 1496
  
  SArray* group = taosArrayInit(1, POINTER_BYTES);
  
  taosArrayPush(group, &pTable);
1497
  taosArrayPush(pGroupInfo->pGroupList, &group);
1498 1499 1500
  
  return TSDB_CODE_SUCCESS;
}
1501
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
1502
  STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
1503 1504 1505 1506
  if (pQueryHandle == NULL) {
    return;
  }
  
1507
  size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
1508
  for (int32_t i = 0; i < size; ++i) {
1509 1510 1511
    STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
    tSkipListDestroyIter(pTableCheckInfo->iter);

H
hjxilinx 已提交
1512 1513 1514
    if (pTableCheckInfo->pDataCols != NULL) {
      tfree(pTableCheckInfo->pDataCols->buf);
    }
1515

1516
    tfree(pTableCheckInfo->pDataCols);
1517

1518 1519
    tfree(pTableCheckInfo->pCompInfo);
  }
1520

1521
  taosArrayDestroy(pQueryHandle->pTableCheckInfo);
1522 1523
  tfree(pQueryHandle->compIndex);

H
TD-100  
hzcheng 已提交
1524 1525 1526 1527 1528
  // size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
  // for (int32_t i = 0; i < cols; ++i) {
  //   SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
  //   // tfree(pColInfo->pData);
  // }
1529

1530 1531
  taosArrayDestroy(pQueryHandle->pColumns);
  
1532
  tfree(pQueryHandle->pDataBlockInfo);
H
TD-100  
hzcheng 已提交
1533
  tsdbDestroyHelper(&pQueryHandle->rhelper);
1534 1535
  tfree(pQueryHandle);
}