tsdbRead.c 143.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17 18 19 20 21 22
#include "tsdb.h"
#include "tsdbDef.h"
#include "tsdbFS.h"
#include "tsdbLog.h"
#include "tsdbReadImpl.h"
#include "ttime.h"
#include "exception.h"
H
hjxilinx 已提交
23
#include "os.h"
24
#include "talgo.h"
25
#include "tcompare.h"
26 27
#include "tdataformat.h"
#include "tskiplist.h"
28

29
#include "taosdef.h"
30
#include "tlosertree.h"
H
Hongze Cheng 已提交
31
#include "tsdbDef.h"
32
#include "tmsg.h"
33

34
#define EXTRA_BYTES 2
35
#define ASCENDING_TRAVERSE(o)   (o == TSDB_ORDER_ASC)
36
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
37

H
Haojun Liao 已提交
38 39 40 41
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
42
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
43

H
hjxilinx 已提交
44
enum {
45 46
  TSDB_QUERY_TYPE_ALL      = 1,
  TSDB_QUERY_TYPE_LAST     = 2,
H
hjxilinx 已提交
47 48
};

49 50 51 52 53 54
enum {
  TSDB_CACHED_TYPE_NONE    = 0,
  TSDB_CACHED_TYPE_LASTROW = 1,
  TSDB_CACHED_TYPE_LAST    = 2,
};

55 56
typedef struct SQueryFilePos {
  int32_t fid;
57 58
  int32_t slot;
  int32_t pos;
59
  int64_t lastKey;
60 61
  int32_t rows;
  bool    mixBlock;
62
  bool    blockCompleted;
63
  STimeWindow win;
64
} SQueryFilePos;
H
hjxilinx 已提交
65

66
typedef struct SDataBlockLoadInfo {
H
Hongze Cheng 已提交
67
  SDFileSet*  fileGroup;
68
  int32_t     slot;
69
  uint64_t    uid;
70
  SArray*     pLoadedCols;
71
} SDataBlockLoadInfo;
H
hjxilinx 已提交
72

73
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
74
  int32_t tid; /* table tid */
75 76
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
77

78 79 80 81 82 83
enum {
  CHECKINFO_CHOSEN_MEM  = 0,
  CHECKINFO_CHOSEN_IMEM = 1,
  CHECKINFO_CHOSEN_BOTH = 2    //for update=2(merge case)
};

D
init  
dapan1121 已提交
84

85
typedef struct STableCheckInfo {
86
  uint64_t      tableId;
H
Haojun Liao 已提交
87
  TSKEY         lastKey;
H
Haojun Liao 已提交
88
  SBlockInfo*   pCompInfo;
H
Haojun Liao 已提交
89
  int32_t       compSize;
90
  int32_t       numOfBlocks:29; // number of qualified data blocks not the original blocks
91
  uint8_t       chosen:2;       // indicate which iterator should move forward
H
Haojun Liao 已提交
92 93 94
  bool          initBuf;        // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;      // mem buffer skip list iterator
  SSkipListIterator* iiter;     // imem buffer skip list iterator
95
} STableCheckInfo;
96

97
typedef struct STableBlockInfo {
H
Haojun Liao 已提交
98 99
  SBlock          *compBlock;
  STableCheckInfo *pTableCheckInfo;
100
} STableBlockInfo;
101

102 103
typedef struct SBlockOrderSupporter {
  int32_t             numOfTables;
H
Haojun Liao 已提交
104
  STableBlockInfo**   pDataBlockInfo;
105
  int32_t*            blockIndexArray;
106
  int32_t*            numOfBlocksPerTable;
107 108
} SBlockOrderSupporter;

H
Haojun Liao 已提交
109 110 111
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
112
  int64_t checkForNextTime;
113 114
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
115 116
} SIOCostSummary;

117 118
typedef struct STsdbReadHandle {
  STsdb*     pTsdb;
H
Haojun Liao 已提交
119 120 121 122 123 124 125 126 127
  SQueryFilePos  cur;              // current position
  int16_t        order;
  STimeWindow    window;           // the primary query time window that applies to all queries
  SDataStatis*   statis;           // query level statistics, only one table block statistics info exists at any time
  int32_t        numOfBlocks;
  SArray*        pColumns;         // column list, SColumnInfoData array list
  bool           locateStart;
  int32_t        outputCapacity;
  int32_t        realNumOfRows;
H
Haojun Liao 已提交
128
  SArray*        pTableCheckInfo;  // SArray<STableCheckInfo>
H
Haojun Liao 已提交
129 130
  int32_t        activeIndex;
  bool           checkFiles;       // check file stage
D
init  
dapan1121 已提交
131
  int8_t         cachelastrow;     // check if last row cached
132
  bool           loadExternalRow;  // load time window external data rows
H
Haojun Liao 已提交
133 134
  bool           currentLoadExternalRows; // current load external rows
  int32_t        loadType;         // block load type
H
Haojun Liao 已提交
135
  char          *idStr;            // query info handle, for debug purpose
H
Haojun Liao 已提交
136
  int32_t        type;             // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
H
Hongze Cheng 已提交
137 138 139
  SDFileSet*     pFileGroup;
  SFSIter        fileIter;
  SReadH         rhelper;
H
Haojun Liao 已提交
140
  STableBlockInfo* pDataBlockInfo;
H
Haojun Liao 已提交
141
  SDataCols     *pDataCols;        // in order to hold current file data block
H
Haojun Liao 已提交
142
  int32_t        allocSize;        // allocated data block size
H
Haojun Liao 已提交
143
  SArray        *defaultLoadColumn;// default load column
H
Haojun Liao 已提交
144
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
H
Haojun Liao 已提交
145
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
H
Haojun Liao 已提交
146

147 148
  SArray        *prev;             // previous row which is before than time window
  SArray        *next;             // next row which is after the query time window
H
Haojun Liao 已提交
149
  SIOCostSummary cost;
150
} STsdbReadHandle;
151

H
Haojun Liao 已提交
152 153 154
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
155
  SSchema*   pTagSchema;
H
Haojun Liao 已提交
156 157
} STableGroupSupporter;

158
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
159 160 161
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
//static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
162

163 164
static void    changeQueryHandleForInterpQuery(tsdbReadHandleT pHandle);
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
165
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
166
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle);
167
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
168 169 170 171
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
//static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
//static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool    tsdbGetExternalRow(tsdbReadHandleT pHandle);
Y
TD-1733  
yihaoDeng 已提交
172

173
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
174
  pBlockLoadInfo->slot = -1;
175
  pBlockLoadInfo->uid  = 0;
H
hjxilinx 已提交
176
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
177 178
}

179
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
180
  pCompBlockLoadInfo->tid = -1;
181 182
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
183

184 185
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
186 187 188 189
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
190
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
191 192 193 194 195 196
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

197 198
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
199 200 201 202 203

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
204 205
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
206 207 208 209 210 211
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

212 213 214 215 216 217 218 219 220
static void tsdbMayTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle, SArray* psTable) {
//  assert(pTsdbReadHandle != NULL && pTsdbReadHandle->pMemRef != NULL);
//
//  STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
//  if (pTsdbReadHandle->pMemRef->ref++ == 0) {
//    tsdbTakeMemSnapshot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot), psTable);
//  }
//
//  taosArrayDestroy(psTable);
Y
TD-1733  
yihaoDeng 已提交
221
}
222

223 224 225 226 227 228 229 230 231 232 233 234
static void tsdbMayUnTakeMemSnapshot(STsdbReadHandle* pTsdbReadHandle) {
//  assert(pTsdbReadHandle != NULL);
//  STsdbMemTable* pMemRef = pTsdbReadHandle->pMemRef;
//  if (pMemRef == NULL) { // it has been freed
//    return;
//  }
//
//  if (--pMemRef->ref == 0) {
//    tsdbUnTakeMemSnapShot(pTsdbReadHandle->pTsdb, &(pMemRef->snapshot));
//  }
//
//  pTsdbReadHandle->pMemRef = NULL;
Y
yihaoDeng 已提交
235 236
}

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle) {
//  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
//
//  int64_t rows = 0;
//  STsdbMemTable* pMemTable = pTsdbReadHandle->pMemTable;
//  if (pMemTable == NULL) { return rows; }
//
////  STableData* pMem  = NULL;
////  STableData* pIMem = NULL;
//
////  SMemTable* pMemT = pMemRef->snapshot.mem;
////  SMemTable* pIMemT = pMemRef->snapshot.imem;
//
//  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  for (int32_t i = 0; i < size; ++i) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
//
////    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
////      pMem = pMemT->tData[pCheckInfo->tableId];
////      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
////    }
////    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
////      pIMem = pIMemT->tData[pCheckInfo->tableId];
////      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
////    }
//  }
//  return rows;
//}

static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList, SArray** psTable) {
H
Haojun Liao 已提交
267
  size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
268
  assert(sizeOfGroup >= 1);
H
Haojun Liao 已提交
269 270 271 272 273 274 275

  // allocate buffer in order to load data blocks from file
  SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

D
fix bug  
dapan1121 已提交
276 277 278 279 280 281
  SArray* pTable = taosArrayInit(4, sizeof(STable*));
  if (pTable == NULL) {
    taosArrayDestroy(pTableCheckInfo);
    return NULL;
  }

H
Haojun Liao 已提交
282 283 284 285 286 287 288 289 290 291
  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < sizeOfGroup; ++i) {
    SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);

    size_t gsize = taosArrayGetSize(group);
    assert(gsize > 0);

    for (int32_t j = 0; j < gsize; ++j) {
      STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);

H
Haojun Liao 已提交
292
      STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
293 294
//      assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
//                                        info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
H
Haojun Liao 已提交
295

296
      info.tableId = pKeyInfo->uid;
H
Haojun Liao 已提交
297

298 299 300
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
        if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
          info.lastKey = pTsdbReadHandle->window.skey;
301 302
        }

303
        assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
304
      } else {
305
        assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
306 307 308
      }

      taosArrayPush(pTableCheckInfo, &info);
H
Haojun Liao 已提交
309
      tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
310 311 312
    }
  }

313
//  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
D
dapan1121 已提交
314
  size_t gsize = taosArrayGetSize(pTableCheckInfo);
H
Haojun Liao 已提交
315 316 317
//  for (int32_t i = 0; i < gsize; ++i) {
//    STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
//  }
D
dapan1121 已提交
318 319

  *psTable = pTable;
H
Haojun Liao 已提交
320 321 322
  return pTableCheckInfo;
}

323 324
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
325 326 327 328
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
329 330
    STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
331 332
    pCheckInfo->iter    = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter   = tSkipListDestroyIter(pCheckInfo->iiter);
333
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
334

335 336
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
337
    } else {
338
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
339
    }
H
Haojun Liao 已提交
340 341 342
  }
}

H
Haojun Liao 已提交
343 344 345
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
346

H
Haojun Liao 已提交
347
  STableCheckInfo info = { .lastKey = skey};
H
Haojun Liao 已提交
348

H
Haojun Liao 已提交
349 350
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
351 352 353
  return pNew;
}

354 355
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
356

357 358
  STimeWindow* w = &pTsdbReadHandle->window;
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
359 360 361 362

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

363 364
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
365
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
366 367 368
  STsdbCfg* pCfg = &pTsdb->config;

  int64_t now = taosGetTimestamp(pCfg->precision);
369
  return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1;  // needs to add one tick
370 371
}

372 373
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
  pTsdbReadHandle->window = pCond->twindow;
374

375
  bool    updateTs = false;
376 377 378 379
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
380 381
      pCond->twindow.skey = startTs;
      updateTs = true;
382 383
    }
  } else {
384 385
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
386 387
      pCond->twindow.ekey = startTs;
      updateTs = true;
388 389 390
    }
  }

391
  if (updateTs) {
H
Haojun Liao 已提交
392 393 394
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
              pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
              pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
395
  }
396 397
}

H
Haojun Liao 已提交
398
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
399 400
  STsdbReadHandle* pReadHandle = calloc(1, sizeof(STsdbReadHandle));
  if (pReadHandle == NULL) {
401
    goto _end;
402
  }
H
Haojun Liao 已提交
403

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
  pReadHandle->order       = pCond->order;
  pReadHandle->pTsdb       = tsdb;
  pReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid     = INT32_MIN;
  pReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles  = true;
  pReadHandle->activeIndex = 0;   // current active table index
  pReadHandle->allocSize   = 0;
  pReadHandle->locateStart = false;
  pReadHandle->loadType    = pCond->type;

  pReadHandle->outputCapacity  = 4096;//((STsdb*)tsdb)->config.maxRowsPerFileBlock;
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
419
  char buf[128] = {0};
H
Haojun Liao 已提交
420
  snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
H
Haojun Liao 已提交
421 422
  pReadHandle->idStr = strdup(buf);

423
  if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
424
    goto _end;
B
Bomin Zhang 已提交
425
  }
H
Haojun Liao 已提交
426

427 428
  assert(pCond != NULL);
  setQueryTimewindow(pReadHandle, pCond);
429

430 431
  if (pCond->numOfCols > 0) {
    // allocate buffer in order to load data blocks from file
432 433
    pReadHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
    if (pReadHandle->statis == NULL) {
434
      goto _end;
435
    }
H
Haojun Liao 已提交
436

437
    // todo: use list instead of array?
438 439
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
440
      goto _end;
441
    }
H
Haojun Liao 已提交
442

443 444
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
H
Haojun Liao 已提交
445

446
      colInfo.info = pCond->colList[i];
447
      colInfo.pData = calloc(1, EXTRA_BYTES + pReadHandle->outputCapacity * pCond->colList[i].bytes);
448
      if (colInfo.pData == NULL) {
449
        goto _end;
450
      }
451

452 453
      taosArrayPush(pReadHandle->pColumns, &colInfo);
      pReadHandle->statis[i].colId = colInfo.info.colId;
B
Bomin Zhang 已提交
454
    }
H
Haojun Liao 已提交
455

456
    pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
H
Haojun Liao 已提交
457
  }
458

459 460
  pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
461
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
462
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
463
    goto _end;
H
hjxilinx 已提交
464
  }
465

466 467
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
468

469
  return (tsdbReadHandleT)pReadHandle;
470

471
  _end:
472
  tsdbCleanupQueryHandle(pReadHandle);
473
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
474
  return NULL;
H
hjxilinx 已提交
475 476
}

H
Haojun Liao 已提交
477 478
tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) {
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
479
  if (pTsdbReadHandle == NULL) {
480 481 482
    return NULL;
  }

483 484
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    return (tsdbReadHandleT*) pTsdbReadHandle;
485
  }
H
Haojun Liao 已提交
486

D
fix bug  
dapan1121 已提交
487 488
  SArray* psTable = NULL;

H
Haojun Liao 已提交
489
  // todo apply the lastkey of table check to avoid to load header file
490 491 492
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, &psTable);
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//    tsdbCleanupQueryHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
493
    taosArrayDestroy(psTable);
H
Haojun Liao 已提交
494 495 496 497
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
498
  tsdbDebug("%p total numOfTable:%" PRIzu " in query, %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->idStr);
499
  return (tsdbReadHandleT) pTsdbReadHandle;
H
Haojun Liao 已提交
500 501
}

502 503
void tsdbResetQueryHandle(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond) {
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
504

505 506 507
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
dengyihao's avatar
dengyihao 已提交
508
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, int64_t);
509 510 511 512 513
    }

    return;
  }

514 515 516 517 518 519 520 521 522
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
523 524

  if (ASCENDING_TRAVERSE(pCond->order)) {
525
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
526
  } else {
527
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
528 529 530
  }

  // allocate buffer in order to load data blocks from file
531
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
532

533 534
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
535

536
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
537 538
}

539 540
void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList) {
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
541

542 543 544 545 546 547 548 549 550
  pTsdbReadHandle->order       = pCond->order;
  pTsdbReadHandle->window      = pCond->twindow;
  pTsdbReadHandle->type        = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid     = -1;
  pTsdbReadHandle->cur.win     = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles  = true;
  pTsdbReadHandle->activeIndex = 0;   // current active table index
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
551 552

  if (ASCENDING_TRAVERSE(pCond->order)) {
553
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
554
  } else {
555
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
556 557 558
  }

  // allocate buffer in order to load data blocks from file
559
  memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
H
Haojun Liao 已提交
560

561 562
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
563

H
Haojun Liao 已提交
564
  SArray* pTable = NULL;
565
//  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
566

567
//  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
568

569 570 571
  pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//    tsdbCleanupQueryHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
572 573
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
574

575 576
//  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
577 578
}

H
Haojun Liao 已提交
579
tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
580
  pCond->twindow = updateLastrowForEachGroup(groupList);
H
Haojun Liao 已提交
581 582 583 584 585 586

  // no qualified table
  if (groupList->numOfTables == 0) {
    return NULL;
  }

H
Haojun Liao 已提交
587
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId);
588
  if (pTsdbReadHandle == NULL) {
589 590 591
    return NULL;
  }

592
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList);
H
Haojun Liao 已提交
593 594 595 596
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
597 598

  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
599 600
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
601 602
  }
  
603
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
604 605
}

606 607 608 609
#if 0
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
  if (pTsdbReadHandle == NULL) {
610 611 612
    return NULL;
  }

613
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
614 615 616 617 618
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

619 620
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
621
  }
D
init  
dapan1121 已提交
622
  
623
  return pTsdbReadHandle;
H
hjxilinx 已提交
624 625
}

626 627
#endif
SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
628
  assert(pHandle != NULL);
H
Haojun Liao 已提交
629

630
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
631

632
  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
633
  SArray* res = taosArrayInit(size, POINTER_BYTES);
634 635 636
  return res;
}

H
Haojun Liao 已提交
637 638 639 640 641 642
// leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
  assert(pGroupList);
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  STableGroupInfo* pNew = calloc(1, sizeof(STableGroupInfo));
Y
yihaoDeng 已提交
643
  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
H
Haojun Liao 已提交
644 645 646 647 648 649 650 651

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
    size_t numOfTables = taosArrayGetSize(oneGroup);

    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
    for (int32_t j = 0; j < numOfTables; ++j) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
652 653 654 655 656
//      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//        taosArrayPush(px, pInfo);
//        pNew->numOfTables += 1;
//        break;
//      }
H
Haojun Liao 已提交
657 658 659 660 661 662 663 664 665 666 667 668 669
    }

    // there are no data in this group
    if (taosArrayGetSize(px) == 0) {
      taosArrayDestroy(px);
    } else {
      taosArrayPush(pNew->pGroupList, &px);
    }
  }

  return pNew;
}

H
Haojun Liao 已提交
670
tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
671 672
  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);

673 674 675 676 677 678 679 680 681 682 683 684
  if (pNew->numOfTables == 0) {
    tsdbDebug("update query time range to invalidate time window");

    assert(taosArrayGetSize(pNew->pGroupList) == 0);
    bool asc = ASCENDING_TRAVERSE(pCond->order);
    if (asc) {
      pCond->twindow.ekey = pCond->twindow.skey - 1;
    } else {
      pCond->twindow.skey = pCond->twindow.ekey - 1;
    }
  }

H
Haojun Liao 已提交
685
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId);
686 687
  pTsdbReadHandle->loadExternalRow = true;
  pTsdbReadHandle->currentLoadExternalRows = true;
688

689
  return pTsdbReadHandle;
690 691
}

692
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
693
  if (pCheckInfo->initBuf) {
694 695
    return true;
  }
H
Haojun Liao 已提交
696

697
  pCheckInfo->initBuf = true;
698
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
699

700 701 702 703 704 705 706
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

  TKEY tLastKey = 0;  /// keyToTkey(pCheckInfo->lastKey);
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
707
      pCheckInfo->iter =
708
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
709
    }
710
  }
H
Haojun Liao 已提交
711

712 713 714
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
715
      pCheckInfo->iiter =
716
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
717
    }
718
  }
H
Haojun Liao 已提交
719

720 721 722 723
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
724

725 726 727 728 729
  bool memEmpty  = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
  if (memEmpty && imemEmpty) { // buffer is empty
    return false;
  }
H
Haojun Liao 已提交
730

731 732 733
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
734

C
Cary Xu 已提交
735 736
    SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
    TSKEY   key = memRowKey(row);  // first timestamp in buffer
737
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
738 739
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
740 741 742 743 744 745 746

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

747
  } else {
H
Haojun Liao 已提交
748
    tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
749
  }
H
Haojun Liao 已提交
750

751 752 753
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
754

C
Cary Xu 已提交
755 756
    SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
    TSKEY   key = memRowKey(row);  // first timestamp in buffer
757
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
758 759
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
760 761 762 763 764 765

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
766
  } else {
H
Haojun Liao 已提交
767
    tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
768
  }
H
Haojun Liao 已提交
769

770 771 772
  return true;
}

H
Haojun Liao 已提交
773 774 775 776 777
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
  SMemRow rmem = NULL, rimem = NULL;
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
      rmem = (SMemRow)SL_GET_NODE_DATA(node);
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
      rimem = (SMemRow)SL_GET_NODE_DATA(node);
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return memRowKey(rmem);
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return memRowKey(rimem);
  }

  TSKEY r1 = memRowKey(rmem);
  TSKEY r2 = memRowKey(rimem);

  if (r1 == r2) {
    if(update == TD_ROW_DISCARD_UPDATE){
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
    }
    else if(update == TD_ROW_OVERWRITE_UPDATE) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
  }
  else {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, SMemRow* extraRow) {
C
Cary Xu 已提交
834
  SMemRow rmem = NULL, rimem = NULL;
H
Haojun Liao 已提交
835 836 837
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
C
Cary Xu 已提交
838
      rmem = (SMemRow)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
839 840
    }
  }
841

H
Haojun Liao 已提交
842 843 844
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
C
Cary Xu 已提交
845
      rimem = (SMemRow)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
846 847
    }
  }
848

H
Haojun Liao 已提交
849 850
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
851
  }
852

H
Haojun Liao 已提交
853
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
854 855 856
    pCheckInfo->chosen = 0;
    return rmem;
  }
857

H
Haojun Liao 已提交
858
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
859 860 861
    pCheckInfo->chosen = 1;
    return rimem;
  }
862

C
Cary Xu 已提交
863 864
  TSKEY r1 = memRowKey(rmem);
  TSKEY r2 = memRowKey(rimem);
H
Haojun Liao 已提交
865

866 867
  if (r1 == r2) {
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
868
      tSkipListIterNext(pCheckInfo->iter);
869
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
870
      return rimem;
871
    } else if(update == TD_ROW_OVERWRITE_UPDATE){
H
TD-1439  
Hongze Cheng 已提交
872
      tSkipListIterNext(pCheckInfo->iiter);
873 874 875 876 877
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
      extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
878 879
      return rmem;
    }
H
Haojun Liao 已提交
880 881 882
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
883
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
884 885
        return rmem;
      } else {
886
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
887 888 889 890
        return rimem;
      }
    } else {
      if (r1 < r2) {
891
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
892 893
        return rimem;
      } else {
894
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
895 896 897 898
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
899 900
}

901
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
902
  bool hasNext = false;
903
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
904 905 906
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
907

H
Haojun Liao 已提交
908 909 910
    if (hasNext) {
      return hasNext;
    }
911

H
Haojun Liao 已提交
912 913 914
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
915
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM){
916 917 918
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
919

920 921 922
    if (hasNext) {
      return hasNext;
    }
923

924 925
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
926
    }
927 928 929 930 931 932 933
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
934
  }
935

H
Haojun Liao 已提交
936 937 938
  return hasNext;
}

939
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
TD-1439  
Hongze Cheng 已提交
940
  STsdbCfg *pCfg = &pHandle->pTsdb->config;
941 942
  size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
943
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
944

945
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
946 947 948 949
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

950
  SMemRow row = getSMemRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
951
  if (row == NULL) {
952 953
    return false;
  }
954

C
Cary Xu 已提交
955
  pCheckInfo->lastKey = memRowKey(row);  // first timestamp in buffer
H
Haojun Liao 已提交
956 957
  tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
      pCheckInfo->tableId,  pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
958

959
  // all data in mem are checked already.
960 961
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
962 963
    return false;
  }
H
Haojun Liao 已提交
964

965 966
  int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
967
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
968

969 970 971 972
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
973

974
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
dengyihao's avatar
dengyihao 已提交
975
    TSWAP(win->skey, win->ekey, TSKEY);
976
  }
H
Haojun Liao 已提交
977

978
  return true;
979
}
H
hjxilinx 已提交
980

981 982
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
983 984 985
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
986

D
dapan1121 已提交
987
  if (key < 0) {
988
    key -= (daysPerFile * tsTickPerDay[precision]);
D
dapan1121 已提交
989 990
  }
  
991
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision]));  // set the starting fileId
992 993 994
  if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
995

996
  if (fid > 0L && fid > INT32_MAX) {
997 998
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
999

S
TD-1057  
Shengliang Guan 已提交
1000
  return (int32_t)fid;
1001 1002
}

H
refact  
Hongze Cheng 已提交
1003
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1004 1005
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1006

1007
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1008

1009 1010 1011
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1012

1013
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1014

1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1026

1027 1028
  return midSlot;
}
1029

1030
static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1031
  int32_t code = 0;
H
Haojun Liao 已提交
1032

1033
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1034
  pCheckInfo->numOfBlocks = 0;
1035

H
Haojun Liao 已提交
1036 1037 1038 1039
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
  table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1040 1041 1042
    code = terrno;
    return code;
  }
1043

1044
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1045

H
Haojun Liao 已提交
1046
  // no data block in this file, try next file
1047
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1048 1049
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1050

H
Haojun Liao 已提交
1051 1052 1053 1054 1055 1056 1057 1058
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

    char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1059 1060
    }

H
Haojun Liao 已提交
1061 1062 1063
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1064

1065
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1066 1067
    return terrno;
  }
H
Haojun Liao 已提交
1068
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1069

H
Haojun Liao 已提交
1070
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1071

1072 1073
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1074
  } else {
1075
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1076
  }
1077

dengyihao's avatar
dengyihao 已提交
1078 1079
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1080

H
Haojun Liao 已提交
1081 1082 1083
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1084

H
Haojun Liao 已提交
1085 1086 1087
  if (s > pCompInfo->blocks[start].keyLast) {
    return 0;
  }
1088

H
Haojun Liao 已提交
1089 1090 1091 1092
  // todo speedup the procedure of located end block
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
    end += 1;
  }
1093

H
Haojun Liao 已提交
1094
  pCheckInfo->numOfBlocks = (end - start);
1095

H
Haojun Liao 已提交
1096 1097 1098
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1099

H
Haojun Liao 已提交
1100 1101 1102
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1103

1104
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1105 1106 1107 1108
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1109
  pTsdbReadHandle->cost.headFileLoad += 1;
1110 1111
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1112
  size_t numOfTables = 0;
1113 1114 1115 1116
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1117

H
Haojun Liao 已提交
1118
    for (int32_t i = 0; i < numOfTables; ++i) {
1119
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1120
      if (code != TSDB_CODE_SUCCESS) {
1121 1122
        int64_t e = taosGetTimestampUs();

1123
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1124 1125 1126 1127 1128
        return code;
      }
    }
  } else {
    assert(0);
1129
  }
1130

1131
  int64_t e = taosGetTimestampUs();
1132
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1133
  return code;
1134 1135
}

1136
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
H
Haojun Liao 已提交
1137
  int64_t st = taosGetTimestampUs();
1138

H
Haojun Liao 已提交
1139
  STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
1140
  int32_t   code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
H
Haojun Liao 已提交
1141
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1142
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1143 1144 1145 1146
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1147
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema);
H
Haojun Liao 已提交
1148
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1149
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1150 1151 1152 1153
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1154
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema);
H
Haojun Liao 已提交
1155
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1156
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1157 1158 1159
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1160

1161
  int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
H
Haojun Liao 已提交
1162

1163
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)));
H
Haojun Liao 已提交
1164
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1165 1166 1167
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1168
  }
1169

1170
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1171

1172 1173
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1174
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1175

1176
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1177
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1178

1179
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1180

1181
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
1182
  if(pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1183 1184 1185 1186 1187 1188
    int64_t* src = pCols->cols[0].pData;
    for(int32_t i = 0; i < pBlock->numOfRows; ++i) {
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1189
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1190
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1191

H
Haojun Liao 已提交
1192 1193
  tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s",
      pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1194
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1195 1196 1197 1198

_error:
  pBlock->numOfRows = 0;

H
Haojun Liao 已提交
1199 1200
  tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s",
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1201
  return terrno;
H
hjxilinx 已提交
1202 1203
}

1204 1205 1206 1207 1208
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
1209

1210 1211 1212
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1213
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1214
  TSKEY          key;
H
Haojun Liao 已提交
1215
  int32_t code = TSDB_CODE_SUCCESS;
1216

1217
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1218 1219
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

1220
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
1221

H
Haojun Liao 已提交
1222
  if (key != TSKEY_INITIAL_VAL) {
H
Haojun Liao 已提交
1223
    tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1224
  } else {
H
Haojun Liao 已提交
1225
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1226
  }
H
Haojun Liao 已提交
1227

1228 1229
  if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
H
Haojun Liao 已提交
1230

1231 1232
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
1233

H
Haojun Liao 已提交
1234
      // do not load file block into buffer
1235
      int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
1236

1237 1238 1239
      TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step);
      cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1240 1241 1242

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1243
      if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1244
        TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
H
Haojun Liao 已提交
1245
      }
H
Haojun Liao 已提交
1246

H
Haojun Liao 已提交
1247 1248
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1249
      return code;
H
Haojun Liao 已提交
1250
    }
H
Haojun Liao 已提交
1251

1252

1253
    // return error, add test cases
1254
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1255
      return code;
1256 1257
    }

1258
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1259
  } else {
1260 1261 1262 1263 1264 1265
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1266 1267
    assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1268

1269 1270 1271
    if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1272 1273 1274 1275

      cur->rows = binfo.rows;
      cur->win  = binfo.window;
      cur->mixBlock = false;
H
Haojun Liao 已提交
1276 1277
      cur->blockCompleted = true;

1278
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
H
Haojun Liao 已提交
1279 1280 1281 1282 1283 1284
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
H
Haojun Liao 已提交
1285
    } else { // partially copy to dest buffer
1286
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1287 1288
      cur->mixBlock = true;
    }
1289

H
Haojun Liao 已提交
1290
    assert(cur->blockCompleted);
H
Haojun Liao 已提交
1291
    if (cur->rows == binfo.rows) {
H
Haojun Liao 已提交
1292 1293
      tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1294
    } else {
H
Haojun Liao 已提交
1295 1296
      tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1297 1298
    }

1299
  }
H
Haojun Liao 已提交
1300 1301

  return code;
1302 1303
}

1304 1305
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1306
  int32_t code = TSDB_CODE_SUCCESS;
1307
  bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1308

1309
  if (asc) {
H
Haojun Liao 已提交
1310
    // query ended in/started from current block
1311 1312
    if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1313 1314
        *exists = false;
        return code;
1315
      }
1316

1317
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1318
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1319

1320 1321
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
1322
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1323 1324 1325
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1326

H
Haojun Liao 已提交
1327
      assert(pCheckInfo->lastKey <= pBlock->keyLast);
1328
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1329
    } else {  // the whole block is loaded in to buffer
1330
      cur->pos = asc? 0:(pBlock->numOfRows - 1);
1331
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1332
    }
1333
  } else {  //desc order, query ended in current block
1334 1335
    if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1336 1337
        *exists = false;
        return code;
1338
      }
H
Haojun Liao 已提交
1339

1340
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
1341
      if (pCheckInfo->lastKey < pBlock->keyLast) {
1342
        cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1343
      } else {
H
Haojun Liao 已提交
1344
        cur->pos = pBlock->numOfRows - 1;
1345
      }
H
Haojun Liao 已提交
1346

H
Haojun Liao 已提交
1347
      assert(pCheckInfo->lastKey >= pBlock->keyFirst);
1348
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1349
    } else {
1350
      cur->pos = asc? 0:(pBlock->numOfRows-1);
1351
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1352
    }
1353
  }
1354

1355
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1356
  return code;
H
[td-32]  
hjxilinx 已提交
1357 1358
}

1359
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1360
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1361
  int    numOfRows;
1362 1363
  TSKEY* keyList;

1364
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1365

1366
  if (num <= 0) return -1;
1367 1368

  keyList = (TSKEY*)pValue;
1369 1370
  firstPos = 0;
  lastPos = num - 1;
1371

1372
  if (order == TSDB_ORDER_DESC) {
1373 1374 1375 1376 1377
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1378

H
Haojun Liao 已提交
1379 1380
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1381

1382 1383 1384 1385 1386 1387 1388 1389
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1390

1391 1392 1393 1394 1395
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1396

1397 1398 1399 1400 1401 1402 1403
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1404

H
Haojun Liao 已提交
1405 1406
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1407

1408 1409 1410 1411 1412 1413 1414 1415 1416
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1417

1418 1419 1420
  return midPos;
}

1421
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
1422
  char* pData = NULL;
1423
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
1424

1425
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1426
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1427

1428
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1429 1430 1431 1432 1433 1434
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1435
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1436

1437
  //data in buffer has greater timestamp, copy data in file block
1438 1439
  int32_t i = 0, j = 0;
  while(i < requiredNumOfCols && j < pCols->numOfCols) {
1440
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1441 1442 1443 1444 1445 1446 1447 1448 1449

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

    int32_t bytes = pColInfo->info.bytes;

1450
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
S
TD-1057  
Shengliang Guan 已提交
1451
      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
1452
    } else {
S
TD-1057  
Shengliang Guan 已提交
1453
      pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
1454
    }
1455

L
Liu Jicong 已提交
1456
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1457
      if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
S
TD-1057  
Shengliang Guan 已提交
1458
        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1459 1460 1461 1462 1463
      } else {  // handle the var-string
        char* dst = pData;

        // todo refactor, only copy one-by-one
        for (int32_t k = start; k < num + start; ++k) {
K
kailixu 已提交
1464
          const char* p = tdGetColDataOfRow(src, k);
1465 1466
          memcpy(dst, p, varDataTLen(p));
          dst += bytes;
1467 1468
        }
      }
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483

      j++;
      i++;
    } else { // pColInfo->info.colId < src->colId, it is a NULL data
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        char* dst = pData;

        for(int32_t k = start; k < num + start; ++k) {
          setVardataNull(dst, pColInfo->info.type);
          dst += bytes;
        }
      } else {
        setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
      }
      i++;
1484 1485
    }
  }
1486 1487

  while (i < requiredNumOfCols) { // the remain columns are all null data
1488 1489
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
S
TD-1057  
Shengliang Guan 已提交
1490
      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
1491
    } else {
S
TD-1057  
Shengliang Guan 已提交
1492
      pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
    }

    if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
      char* dst = pData;

      for(int32_t k = start; k < num + start; ++k) {
        setVardataNull(dst, pColInfo->info.type);
        dst += pColInfo->info.bytes;
      }
    } else {
      setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
1504
    }
1505 1506

    i++;
1507
  }
H
Haojun Liao 已提交
1508

1509 1510
  pTsdbReadHandle->cur.win.ekey = tsArray[end];
  pTsdbReadHandle->cur.lastKey = tsArray[end] + step;
1511

1512
  return numOfRows + num;
1513 1514
}

1515
// Note: row1 always has high priority
1516 1517
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                               SMemRow row1, SMemRow row2, int32_t numOfCols, uint64_t uid,
1518
                               STSchema* pSchema1, STSchema* pSchema2, bool forceSetNull) {
1519
  char* pData = NULL;
1520 1521 1522 1523 1524 1525 1526 1527 1528 1529
  STSchema* pSchema;
  SMemRow row;
  int16_t colId;
  int16_t offset;

  bool isRow1DataRow = isDataRow(row1);
  bool isRow2DataRow;
  bool isChosenRowDataRow;
  int32_t chosen_itr;
  void *value;
1530

1531 1532 1533 1534
  // the schema version info is embeded in SDataRow
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
1535
    pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0);
1536
  }
1537

1538 1539
  if(isRow1DataRow) {
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1540
  } else {
1541
    numOfColsOfRow1 = kvRowNCols(memRowKvBody(row1));
D
fix bug  
dapan1121 已提交
1542
  }
1543

1544 1545 1546 1547
  int32_t numOfColsOfRow2 = 0;
  if(row2) {
    isRow2DataRow = isDataRow(row2);
    if (pSchema2 == NULL) {
1548
      pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0);
1549 1550 1551 1552 1553 1554 1555
    }
    if(isRow2DataRow) {
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
      numOfColsOfRow2 = kvRowNCols(memRowKvBody(row2));
    }
  }
C
Cary Xu 已提交
1556

1557 1558 1559

  int32_t i = 0, j = 0, k = 0;
  while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1560
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1561

1562
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591
      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
    } else {
      pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
    }

    int32_t colIdOfRow1;
    if(j >= numOfColsOfRow1) {
      colIdOfRow1 = INT32_MAX;
    } else if(isRow1DataRow) {
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
      void *rowBody = memRowKvBody(row1);
      SColIdx *pColIdx = kvRowColIdxAt(rowBody, j);
      colIdOfRow1 = pColIdx->colId;
    }

    int32_t colIdOfRow2;
    if(k >= numOfColsOfRow2) {
      colIdOfRow2 = INT32_MAX;
    } else if(isRow2DataRow) {
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
      void *rowBody = memRowKvBody(row2);
      SColIdx *pColIdx = kvRowColIdxAt(rowBody, k);
      colIdOfRow2 = pColIdx->colId;
    }

    if(colIdOfRow1 == colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1592
        j++;
1593
        k++;
C
Cary Xu 已提交
1594 1595
        continue;
      }
1596 1597 1598 1599 1600 1601 1602 1603
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else if(colIdOfRow1 < colIdOfRow2) {
      if(colIdOfRow1 < pColInfo->info.colId) {
        j++;
        continue;
C
Cary Xu 已提交
1604
      }
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
      if(colIdOfRow2 < pColInfo->info.colId) {
        k++;
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
    if(isChosenRowDataRow) {
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
      void *rowBody = memRowDataBody(row);
      value = tdGetRowDataOfCol(rowBody, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
    } else {
      void *rowBody = memRowKvBody(row);
      SColIdx *pColIdx = kvRowColIdxAt(rowBody, chosen_itr);
      colId = pColIdx->colId;
      offset = pColIdx->offset;
      value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset);
    }

C
Cary Xu 已提交
1632

1633 1634
    if (colId == pColInfo->info.colId) {
      if(forceSetNull || (!isNull(value, (int8_t)pColInfo->info.type))) {
C
Cary Xu 已提交
1635 1636 1637 1638 1639 1640 1641 1642 1643
        switch (pColInfo->info.type) {
          case TSDB_DATA_TYPE_BINARY:
          case TSDB_DATA_TYPE_NCHAR:
            memcpy(pData, value, varDataTLen(value));
            break;
          case TSDB_DATA_TYPE_NULL:
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
1644
            *(uint8_t *)pData = *(uint8_t *)value;
C
Cary Xu 已提交
1645 1646 1647
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
1648
            *(uint16_t *)pData = *(uint16_t *)value;
C
Cary Xu 已提交
1649 1650 1651
            break;
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
1652
            *(uint32_t *)pData = *(uint32_t *)value;
C
Cary Xu 已提交
1653 1654 1655
            break;
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
1656
            *(uint64_t *)pData = *(uint64_t *)value;
C
Cary Xu 已提交
1657 1658 1659 1660 1661 1662 1663 1664
            break;
          case TSDB_DATA_TYPE_FLOAT:
            SET_FLOAT_PTR(pData, value);
            break;
          case TSDB_DATA_TYPE_DOUBLE:
            SET_DOUBLE_PTR(pData, value);
            break;
          case TSDB_DATA_TYPE_TIMESTAMP:
1665
            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
1666
              *(TSKEY *)pData = tdGetKey(*(TKEY *)value);
C
Cary Xu 已提交
1667
            } else {
1668
              *(TSKEY *)pData = *(TSKEY *)value;
C
Cary Xu 已提交
1669 1670 1671 1672 1673
            }
            break;
          default:
            memcpy(pData, value, pColInfo->info.bytes);
        }
1674 1675
      }
      i++;
C
Cary Xu 已提交
1676

1677
      if(row == row1) {
C
Cary Xu 已提交
1678
        j++;
1679 1680 1681 1682 1683
      } else {
        k++;
      }
    } else {
      if(forceSetNull) {
C
Cary Xu 已提交
1684 1685 1686 1687 1688 1689
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
          setVardataNull(pData, pColInfo->info.type);
        } else {
          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
        }
      }
1690
      i++;
1691
    }
1692
  }
1693

1694 1695
  if(forceSetNull) {
    while (i < numOfCols) { // the remain columns are all null data
1696 1697
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
C
Cary Xu 已提交
1698 1699 1700 1701 1702
        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
      } else {
        pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
      }

1703 1704 1705 1706
      if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
        setVardataNull(pData, pColInfo->info.type);
      } else {
        setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
1707
      }
1708

1709
      i++;
1710 1711 1712
    }
  }
}
1713

1714 1715
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
  if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1716 1717 1718 1719
    return;
  }

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1720 1721
  if (numOfRows < pTsdbReadHandle->outputCapacity) {
    int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
1722
    for(int32_t i = 0; i < numOfCols; ++i) {
1723
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
1724
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
1725 1726 1727 1728
    }
  }
}

1729
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
1730
                                int32_t* start, int32_t* end) {
1731 1732
  *start = -1;

1733
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1734
    int32_t remain = endPos - startPos + 1;
1735 1736
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1737 1738
    } else {
      *end = endPos;
1739 1740 1741 1742 1743
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1744 1745
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1746 1747
    } else {
      *end = endPos;
1748 1749 1750 1751 1752 1753 1754
    }

    *start = *end;
    *end = startPos;
  }
}

1755 1756
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1757 1758

  pCheckInfo->lastKey = cur->lastKey;
1759
  pTsdbReadHandle->realNumOfRows = numOfRows;
1760 1761 1762 1763
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1764 1765
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1766 1767

  if (cur->rows > 0) {
1768 1769
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1770
    } else {
1771
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1772 1773
    }

1774
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
H
Haojun Liao 已提交
1775 1776
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]);
  } else {
1777
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1778

1779 1780
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1781 1782 1783
  }
}

1784 1785
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1786

1787
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1788 1789
  TSKEY* tsArray = pCols->cols[0].pData;

1790 1791
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
H
Haojun Liao 已提交
1792 1793 1794 1795 1796 1797

  int32_t pos = cur->pos;

  int32_t start = cur->pos;
  int32_t end = endPos;

1798
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1799
    TSWAP(start, end, int32_t);
H
Haojun Liao 已提交
1800 1801
  }

1802 1803
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1804 1805 1806

  // the time window should always be ascending order: skey <= ekey
  cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1807
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
H
Haojun Liao 已提交
1808
  cur->lastKey = tsArray[endPos] + step;
H
Haojun Liao 已提交
1809
  cur->blockCompleted = true;
H
Haojun Liao 已提交
1810 1811

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1812
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
H
Haojun Liao 已提交
1813 1814 1815

  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
  pos = endPos + step;
1816 1817
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1818

H
Haojun Liao 已提交
1819 1820
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1821 1822
}

1823
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1824 1825
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1826
  int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1827

1828 1829
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1830

1831
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
H
Haojun Liao 已提交
1832 1833
    endPos = pBlockInfo->rows - 1;
    cur->mixBlock = (cur->pos != 0);
1834
  } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
H
Haojun Liao 已提交
1835 1836 1837 1838
    endPos = 0;
    cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
  } else {
    assert(pCols->numOfRows > 0);
1839
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
H
Haojun Liao 已提交
1840 1841 1842 1843 1844 1845
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1846 1847
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1848 1849 1850 1851
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  SDataBlockInfo blockInfo = {0};//GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
  STsdbCfg*      pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
1852

1853
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1854

1855 1856
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
H
Haojun Liao 已提交
1857 1858
      cur->pos >= 0 && cur->pos < pBlock->numOfRows);

1859
  TSKEY* tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1860
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
1861 1862

  // for search the endPos, so the order needs to reverse
1863
  int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
1864

1865 1866
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
1867

H
Haojun Liao 已提交
1868
  STable* pTable = NULL;
1869
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1870

1871
  tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
H
Haojun Liao 已提交
1872
            "end:%d, %s",
1873
            pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey,
H
Haojun Liao 已提交
1874
            blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1875

1876 1877
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
H
Haojun Liao 已提交
1878

1879 1880 1881 1882
  int16_t rv1 = -1;
  int16_t rv2 = -1;
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1883

H
Haojun Liao 已提交
1884 1885
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
1886

1887 1888
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
1889
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
1890
    return;
1891
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1892 1893
    SSkipListNode* node = NULL;
    do {
1894
      SMemRow row2 = NULL;
1895
      SMemRow row1 = getSMemRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
1896
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
1897
        break;
1898
      }
1899

1900
      TSKEY key = memRowKey(row1);
1901 1902
      if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1903 1904 1905
        break;
      }

1906 1907
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1908 1909 1910
        break;
      }

1911 1912
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1913
        if (rv1 != memRowVersion(row1)) {
1914
//          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
1915
          rv1 = memRowVersion(row1);
C
Cary Xu 已提交
1916
        }
1917
        if(row2 && rv2 != memRowVersion(row2)) {
1918
//          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
1919 1920 1921
          rv2 = memRowVersion(row2);
        }
        
H
Haojun Liao 已提交
1922
        mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
1923 1924 1925 1926
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1927

1928
        cur->win.ekey = key;
1929 1930 1931
        cur->lastKey  = key + step;
        cur->mixBlock = true;

1932
        moveToNextRowInMem(pCheckInfo);
1933
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
H
TD-1439  
Hongze Cheng 已提交
1934
        if (pCfg->update) {
1935
          if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
1936
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
1937
          }
1938
          if (rv1 != memRowVersion(row1)) {
1939
//            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
1940 1941 1942
            rv1 = memRowVersion(row1);
          }
          if(row2 && rv2 != memRowVersion(row2)) {
1943
//            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
1944 1945 1946 1947
            rv2 = memRowVersion(row2);
          }
          
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
H
Haojun Liao 已提交
1948
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
H
TD-1439  
Hongze Cheng 已提交
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
1963 1964
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
                  (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1965 1966 1967
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1968

1969
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
1970 1971
        assert(end != -1);

H
Haojun Liao 已提交
1972
        if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
1973
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
1974 1975 1976 1977
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
1978
        }
1979

1980
        int32_t qstart = 0, qend = 0;
1981
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
1982

1983
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
1984 1985
        pos += (qend - qstart + 1) * step;

1986
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[qend]:tsArray[qstart];
1987
        cur->lastKey  = cur->win.ekey + step;
1988
      }
1989
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
1990

1991
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
1992 1993 1994 1995
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1996
      if (node == NULL ||
1997 1998 1999 2000
          ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
2001 2002 2003 2004 2005
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

2006
        int32_t start = -1, end = -1;
2007
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
2008

2009
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
2010
        pos += (end - start + 1) * step;
2011

2012
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? tsArray[end]:tsArray[start];
2013
        cur->lastKey  = cur->win.ekey + step;
H
Haojun Liao 已提交
2014
        cur->mixBlock = true;
2015
      }
2016 2017
    }
  }
H
Haojun Liao 已提交
2018 2019

  cur->blockCompleted =
2020 2021
      (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
       ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
2022

2023
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
2024
    TSWAP(cur->win.skey, cur->win.ekey, TSKEY);
2025
  }
2026

2027 2028 2029
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2030

H
Haojun Liao 已提交
2031 2032
  tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s",
      pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr);
2033 2034
}

2035
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2036
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2037
  int    numOfRows;
2038 2039
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2040
  if (num <= 0) return -1;
2041 2042

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2043 2044
  firstPos = 0;
  lastPos = num - 1;
2045

2046
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2047 2048 2049 2050 2051
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2052

H
Haojun Liao 已提交
2053 2054
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2055

H
[td-32]  
hjxilinx 已提交
2056 2057 2058 2059 2060 2061 2062 2063
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2064

H
[td-32]  
hjxilinx 已提交
2065 2066 2067 2068 2069
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2070

H
[td-32]  
hjxilinx 已提交
2071 2072 2073 2074 2075 2076 2077
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2078

H
Haojun Liao 已提交
2079 2080
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2081

H
[td-32]  
hjxilinx 已提交
2082 2083 2084 2085 2086 2087 2088 2089 2090
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2091

H
[td-32]  
hjxilinx 已提交
2092 2093 2094
  return midPos;
}

2095
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
S
TD-1848  
Shengliang Guan 已提交
2096 2097
  tfree(pSupporter->numOfBlocksPerTable);
  tfree(pSupporter->blockIndexArray);
2098 2099

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2100
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
S
TD-1848  
Shengliang Guan 已提交
2101
    tfree(pBlockInfo);
2102 2103
  }

S
TD-1848  
Shengliang Guan 已提交
2104
  tfree(pSupporter->pDataBlockInfo);
2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2116
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2117 2118
    /* left block is empty */
    return 1;
2119
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2120 2121 2122 2123 2124 2125 2126
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2127
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
2128
#if 0	// TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2129 2130
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2131
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2132
  }
H
Haojun Liao 已提交
2133
#endif
2134

H
Haojun Liao 已提交
2135
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2136 2137
}

2138
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2139 2140
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2141 2142 2143
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
    char* tmp = realloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2144 2145 2146 2147
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

2148
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*) tmp;
2149 2150
  }

2151
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2152 2153
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2154
  // access data blocks according to the offset of each block in asc/desc order.
2155
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2156

2157 2158
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
2159
  sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
2160 2161 2162
  sup.blockIndexArray = calloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = calloc(1, POINTER_BYTES * numOfTables);

2163
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2164
    cleanBlockOrderSupporter(&sup, 0);
2165
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2166
  }
H
Haojun Liao 已提交
2167

2168
  int32_t cnt = 0;
2169
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2170

2171
  for (int32_t j = 0; j < numOfTables; ++j) {
2172
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2173 2174 2175
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2176

H
refact  
Hongze Cheng 已提交
2177
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2178
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2179

H
Haojun Liao 已提交
2180
    char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2181
    if (buf == NULL) {
2182
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2183
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2184 2185
    }

2186
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2187 2188

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2189
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2190

H
Haojun Liao 已提交
2191 2192
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2193 2194 2195
      cnt++;
    }

2196
    numOfQualTables++;
2197 2198
  }

H
Haojun Liao 已提交
2199
  assert(numOfBlocks == cnt);
2200

H
Haojun Liao 已提交
2201 2202
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2203
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2204
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2205

H
Haojun Liao 已提交
2206 2207
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
        pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2208 2209
    return TSDB_CODE_SUCCESS;
  }
2210

H
Haojun Liao 已提交
2211 2212
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
      numOfQualTables, pTsdbReadHandle->idStr);
2213

2214
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2215
  sup.numOfTables = numOfQualTables;
2216

H
Haojun Liao 已提交
2217
  SLoserTreeInfo* pTree = NULL;
2218 2219 2220
  uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2221
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2222 2223 2224 2225 2226 2227 2228 2229
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
    int32_t pos = pTree->pNode[0].index;
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2230
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2231
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2232 2233

    // set data block index overflow, in order to disable the offset comparator
2234 2235
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2236
    }
2237 2238 2239 2240 2241 2242 2243

    tLoserTreeAdjust(pTree, pos + sup.numOfTables);
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2244
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2245 2246 2247
   * }
   */

H
Haojun Liao 已提交
2248
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2249 2250 2251 2252 2253 2254
  cleanBlockOrderSupporter(&sup, numOfTables);
  free(pTree);

  return TSDB_CODE_SUCCESS;
}

2255
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2256

2257 2258 2259
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool *exists) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2260 2261

  while(1) {
2262
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2263 2264 2265 2266
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2267 2268
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2269
      // all data blocks in current file has been checked already, try next file if exists
2270
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2271 2272 2273 2274
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2275
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2276 2277 2278 2279
    }
  }
}

2280 2281 2282
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2283 2284 2285

  int32_t code = TSDB_CODE_SUCCESS;

2286
  int32_t numOfBlocks = 0;
2287
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2288

2289
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
2290 2291
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Hongze Cheng 已提交
2292
  while (true) {
2293
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2294

2295 2296
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2297 2298 2299
      break;
    }

2300
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2301 2302

    // current file are not overlapped with query time window, ignore remain files
2303 2304 2305
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2306 2307
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2308 2309
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2310 2311 2312
      break;
    }

2313 2314
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2315 2316 2317 2318
      code = terrno;
      break;
    }

2319
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2320

2321
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2322 2323 2324 2325
      code = terrno;
      break;
    }

2326
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2327 2328
      break;
    }
H
Haojun Liao 已提交
2329

H
Haojun Liao 已提交
2330 2331
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2332

2333 2334 2335 2336
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2337

2338
    // todo return error code to query engine
2339
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) {
2340 2341
      break;
    }
H
Haojun Liao 已提交
2342

2343 2344
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2345 2346 2347
      break;
    }
  }
H
Haojun Liao 已提交
2348

2349
  // no data in file anymore
2350
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2351
    if (code == TSDB_CODE_SUCCESS) {
2352
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2353 2354
    }

D
dapan1121 已提交
2355
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2356 2357
    *exists = false;
    return code;
2358
  }
H
Haojun Liao 已提交
2359

2360 2361 2362
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 0:pTsdbReadHandle->numOfBlocks-1;
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2363

2364 2365
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
  return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2366 2367 2368 2369 2370 2371 2372
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2373 2374
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1 : -1;
H
Haojun Liao 已提交
2375

2376 2377
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2378 2379 2380 2381

  cur->slot += step;
  cur->mixBlock       = false;
  cur->blockCompleted = false;
2382
}
2383 2384 2385
#if 0
int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist* pTableBlockInfo) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
H
Haojun Liao 已提交
2386

H
Haojun Liao 已提交
2387
  pTableBlockInfo->totalSize = 0;
2388
  pTableBlockInfo->totalRows = 0;
2389
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2390 2391

  // find the start data block in file
2392 2393 2394
  pTsdbReadHandle->locateStart = true;
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
  int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2395 2396

  tsdbRLockFS(pFileHandle);
2397 2398
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2399 2400
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2401
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2402

H
Haojun Liao 已提交
2403
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2404
  int32_t     numOfBlocks = 0;
2405
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2406
  int         defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2407 2408 2409 2410
  STimeWindow win = TSWINDOW_INITIALIZER;

  while (true) {
    numOfBlocks = 0;
2411
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2412

2413 2414
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2415 2416 2417
      break;
    }

2418
    tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2419 2420

    // current file are not overlapped with query time window, ignore remain files
2421 2422 2423
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
    (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2424 2425
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2426
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2427 2428 2429
      break;
    }

H
Haojun Liao 已提交
2430
    pTableBlockInfo->numOfFiles += 1;
2431 2432
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2433 2434 2435 2436
      code = terrno;
      break;
    }

2437
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2438

2439
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2440 2441 2442 2443
      code = terrno;
      break;
    }

2444
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2445 2446 2447
      break;
    }

H
Haojun Liao 已提交
2448 2449
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2450 2451 2452 2453 2454 2455

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2456
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2457 2458 2459

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2460
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2461

H
Haojun Liao 已提交
2462
        int32_t numOfRows = pBlock[j].numOfRows;
2463 2464 2465
        pTableBlockInfo->totalRows += numOfRows;
        if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows;
        if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows;
2466
        if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1;
2467 2468 2469
        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2470 2471 2472 2473 2474 2475
      }
    }
  }

  return code;
}
2476
#endif
H
Haojun Liao 已提交
2477

2478 2479 2480
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2481 2482

  // find the start data block in file
2483 2484 2485 2486
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
    STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
    int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
H
Haojun Liao 已提交
2487

H
Hongze Cheng 已提交
2488
    tsdbRLockFS(pFileHandle);
2489 2490
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2491
    tsdbUnLockFS(pFileHandle);
2492

2493
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2494
  } else {
2495
    // check if current file block is all consumed
2496
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2497
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2498

2499
    // current block is done, try next
H
Haojun Liao 已提交
2500
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2501
      // all data blocks in current file has been checked already, try next file if exists
2502
    } else {
H
Haojun Liao 已提交
2503 2504
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2505 2506
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2507

H
Haojun Liao 已提交
2508 2509 2510 2511 2512 2513 2514
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2515 2516
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2517
    } else {
2518 2519 2520
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
      return getDataBlockRv(pTsdbReadHandle, pNext, exists);
2521 2522
    }
  }
2523 2524
}

2525 2526
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2527
  
2528 2529
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2530 2531
      return true;
    }
H
Haojun Liao 已提交
2532

2533
    pTsdbReadHandle->activeIndex += 1;
2534
  }
H
Haojun Liao 已提交
2535

2536 2537 2538
  return false;
}

2539
//todo not unref yet, since it is not support multi-group interpolation query
2540
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReadHandleT pHandle) {
H
Haojun Liao 已提交
2541
  // filter the queried time stamp in the first place
2542
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
H
Haojun Liao 已提交
2543 2544

  // starts from the buffer in case of descending timestamp order check data blocks
2545
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2546 2547 2548

  int32_t i = 0;
  while(i < numOfTables) {
2549
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2550 2551

    // the first qualified table for interpolation query
2552 2553 2554 2555
//    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//      break;
//    }
H
Haojun Liao 已提交
2556 2557 2558 2559 2560 2561 2562 2563 2564

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

2565 2566
  STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2567

2568 2569
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2570 2571 2572
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2573
                                 STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2574
  int     numOfRows = 0;
2575 2576
  int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
  STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
H
Haojun Liao 已提交
2577 2578 2579
  win->skey = TSKEY_INITIAL_VAL;

  int64_t st = taosGetTimestampUs();
D
fix bug  
dapan1121 已提交
2580 2581
  int16_t rv = -1;
  STSchema* pSchema = NULL;
H
Haojun Liao 已提交
2582 2583

  do {
2584
    SMemRow row = getSMemRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
2585 2586 2587 2588
    if (row == NULL) {
      break;
    }

C
Cary Xu 已提交
2589
    TSKEY key = memRowKey(row);
2590 2591 2592
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey,
                pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2593 2594 2595 2596 2597 2598 2599 2600 2601

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
C
Cary Xu 已提交
2602
    if (rv != memRowVersion(row)) {
2603
      pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
C
Cary Xu 已提交
2604
      rv = memRowVersion(row);
D
fix bug  
dapan1121 已提交
2605
    }
2606
    mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true);
H
Haojun Liao 已提交
2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617

    if (++numOfRows >= maxRowsToRead) {
      moveToNextRowInMem(pCheckInfo);
      break;
    }

  } while(moveToNextRowInMem(pCheckInfo));

  assert(numOfRows <= maxRowsToRead);

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
2618
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
H
Haojun Liao 已提交
2619 2620 2621
    int32_t emptySize = maxRowsToRead - numOfRows;

    for(int32_t i = 0; i < numOfCols; ++i) {
2622
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
2623 2624 2625 2626 2627
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
    }
  }

  int64_t elapsedTime = taosGetTimestampUs() - st;
H
Haojun Liao 已提交
2628 2629
  tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle,
            elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2630 2631 2632 2633

  return numOfRows;
}

2634 2635
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2636

2637 2638 2639 2640 2641
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2642

2643
    STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2644 2645 2646
    taosArrayPush(list, &info);
  }

2647
  metaCloseCtbCurosr(pCur);
H
Haojun Liao 已提交
2648 2649 2650 2651 2652 2653 2654 2655
  return TSDB_CODE_SUCCESS;
}

static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

2656 2657 2658 2659 2660 2661
//  tQueryInfo* pInfo = (tQueryInfo*)param;
//  if (pInfo->optr != TSDB_RELATION_IN) {
//    tfree(pInfo->q);
//  } else {
//    taosHashCleanup((SHashObj *)(pInfo->q));
//  }
H
Haojun Liao 已提交
2662 2663 2664 2665

  free(param);
}

2666 2667 2668 2669 2670
#define TSDB_PREV_ROW  0x1
#define TSDB_NEXT_ROW  0x2

static bool  loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2671 2672
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2673

2674
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2675
    if (code != TSDB_CODE_SUCCESS) {
2676
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2677 2678
      return false;
    }
H
Haojun Liao 已提交
2679

H
Haojun Liao 已提交
2680
    if (exists) {
2681 2682 2683 2684
      tsdbRetrieveDataBlock((tsdbReadHandleT*) pTsdbReadHandle, NULL);
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2685 2686
      }

2687
      pTsdbReadHandle->currentLoadExternalRows = false; // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2688 2689
      return exists;
    }
H
Haojun Liao 已提交
2690

2691
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2692
  }
H
Haojun Liao 已提交
2693

2694 2695
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2696 2697
    return true;
  }
H
Haojun Liao 已提交
2698

H
Haojun Liao 已提交
2699
  // current result is empty
2700 2701
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) {
//    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2702

2703 2704
//    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
//    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2705

2706
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2707

2708 2709 2710
//    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2711 2712

    return result;
2713
  }
H
Haojun Liao 已提交
2714

H
Haojun Liao 已提交
2715 2716
  return false;
}
2717

2718
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2719
  // the last row is cached in buffer, return it directly.
2720 2721 2722
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
  int32_t numOfCols  = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2723 2724
  assert(numOfTables > 0 && numOfCols > 0);

2725
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2726

C
Cary Xu 已提交
2727
  SMemRow  pRow = NULL;
H
Haojun Liao 已提交
2728
  TSKEY    key  = TSKEY_INITIAL_VAL;
2729 2730 2731 2732 2733 2734 2735 2736
  int32_t  step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
//    if (ret != TSDB_CODE_SUCCESS) {
//      return false;
//    }
H
Haojun Liao 已提交
2737
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
H
Haojun Liao 已提交
2738
    tfree(pRow);
H
Haojun Liao 已提交
2739

H
Haojun Liao 已提交
2740 2741 2742 2743 2744 2745 2746 2747 2748 2749
    // update the last key value
    pCheckInfo->lastKey = key + step;

    cur->rows     = 1;  // only one row
    cur->lastKey  = key + step;
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2750
  }
H
Haojun Liao 已提交
2751

H
Haojun Liao 已提交
2752 2753 2754
  return false;
}

D
init  
dapan1121 已提交
2755

D
update  
dapan1121 已提交
2756


//static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId);
//      continue;
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2909 2910 2911
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
2912

2913 2914
  while(pTsdbReadHandle->activeIndex < numOfTables) {
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2915 2916 2917
      return true;
    }

2918
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
2919 2920
    pCheckInfo->numOfBlocks = 0;

2921 2922 2923 2924 2925
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
    pTsdbReadHandle->checkFiles  = true;
    pTsdbReadHandle->cur.rows    = 0;
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
2926 2927 2928 2929

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
2930
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2931 2932 2933
  }

  return false;
2934 2935
}

H
Haojun Liao 已提交
2936
// handle data in cache situation
2937 2938
bool tsdbNextDataBlock(tsdbReadHandleT pHandle) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
Y
yihaoDeng 已提交
2939

2940
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2941
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
2942 2943 2944
    return false;
  }

Y
yihaoDeng 已提交
2945 2946 2947
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

2948
  // TODO refactor: remove "type"
2949 2950 2951 2952 2953
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
//      return loadCachedLastRow(pTsdbReadHandle);
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
//      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
2954
    }
H
Haojun Liao 已提交
2955
  }
Y
yihaoDeng 已提交
2956

2957 2958
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
H
Haojun Liao 已提交
2959
  } else { // loadType == RR and Offset Order
2960
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2961 2962 2963
      // check if the query range overlaps with the file data block
      bool exists = true;

2964
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2965
      if (code != TSDB_CODE_SUCCESS) {
2966 2967
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2968 2969 2970 2971 2972

        return false;
      }

      if (exists) {
2973
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
2974 2975
        return exists;
      }
Y
yihaoDeng 已提交
2976

2977 2978
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
2979 2980
    }

H
Haojun Liao 已提交
2981
    // TODO: opt by consider the scan order
2982
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
2983
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
2984

H
Haojun Liao 已提交
2985
    elapsedTime = taosGetTimestampUs() - stime;
2986
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2987
    return ret;
Y
yihaoDeng 已提交
2988 2989
  }
}
2990

2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055
//static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
//    colInfo.pData = calloc(1, pCol->info.bytes);
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
//  STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
//  cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3056
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103
//  tfree(cond.colList);
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
//out_of_memory:
//  tsdbCleanupQueryHandle(pSecQueryHandle);
//  return terrno;
//}

bool tsdbGetExternalRow(tsdbReadHandleT pHandle) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3104

H
Haojun Liao 已提交
3105 3106
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3107
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3108 3109
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3110 3111
  }

3112
  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3113
  for (int32_t i = 0; i < numOfCols; ++i) {
3114 3115
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3116 3117 3118

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3119
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3120
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3121 3122

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3123
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3124
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3125 3126 3127
    }
  }

H
Haojun Liao 已提交
3128 3129
  cur->rows = 2;
  return true;
3130 3131
}

3132
/*
3133
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3134
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3135
 */
3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159
//int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey) {
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
//out:
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle) {
  return ((STsdbReadHandle *)pTsdbReadHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3160 3161
}

3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList) {
  assert(pTsdbReadHandle != NULL && groupList != NULL);

//  TSKEY    key = TSKEY_INITIAL_VAL;
//
//  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
//  assert(group != NULL);
//
//  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
//
//  int32_t code = 0;
//
//  if (((STable*)pInfo->pTable)->lastRow) {
//    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
//    if (code != TSDB_CODE_SUCCESS) {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
//    } else {
//      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
//    }
//  }
//
//  // update the tsdb query time range
//  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
//    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
//    pTsdbReadHandle->checkFiles  = false;
//    pTsdbReadHandle->activeIndex = -1;  // start from -1
//  }
H
Haojun Liao 已提交
3189

3190
  return TSDB_CODE_SUCCESS;
3191 3192
}

3193 3194
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3195 3196

  int32_t code = 0;
3197 3198 3199
//  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
//    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
//  }
D
update  
dapan1121 已提交
3200 3201

  // update the tsdb query time range
3202 3203 3204
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->checkFiles  = false;
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3205 3206 3207 3208 3209 3210
  }

  return code;
}


3211
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
H
Haojun Liao 已提交
3212
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3213

H
Haojun Liao 已提交
3214
  int32_t totalNumOfTable = 0;
3215
  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
H
Haojun Liao 已提交
3216

H
Haojun Liao 已提交
3217 3218 3219 3220 3221
  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  for(int32_t j = 0; j < numOfGroups; ++j) {
    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
    TSKEY   key = TSKEY_INITIAL_VAL;
H
Haojun Liao 已提交
3222

H
Haojun Liao 已提交
3223
    STableKeyInfo keyInfo = {0};
H
Haojun Liao 已提交
3224

H
Haojun Liao 已提交
3225 3226
    size_t numOfTables = taosArrayGetSize(pGroup);
    for(int32_t i = 0; i < numOfTables; ++i) {
3227
      STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
H
Haojun Liao 已提交
3228

H
Haojun Liao 已提交
3229
      // if the lastKey equals to INT64_MIN, there is no data in this table
3230
      TSKEY lastKey = 0;//((STable*)(pInfo->pTable))->lastKey;
H
Haojun Liao 已提交
3231 3232
      if (key < lastKey) {
        key = lastKey;
H
Haojun Liao 已提交
3233

3234
        keyInfo.pTable  = pInfo->pTable;
H
Haojun Liao 已提交
3235
        keyInfo.lastKey = key;
3236
        pInfo->lastKey  = key;
H
Haojun Liao 已提交
3237

H
Haojun Liao 已提交
3238 3239 3240
        if (key < window.skey) {
          window.skey = key;
        }
3241

H
Haojun Liao 已提交
3242 3243 3244 3245
        if (key > window.ekey) {
          window.ekey = key;
        }
      }
3246
    }
H
Haojun Liao 已提交
3247

H
Hongze Cheng 已提交
3248 3249
    // clear current group, unref unused table
    for (int32_t i = 0; i < numOfTables; ++i) {
3250
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
H
Hongze Cheng 已提交
3251 3252

      // keyInfo.pTable may be NULL here.
3253
      if (pInfo->pTable != keyInfo.pTable) {
3254
//        tsdbUnRefTable(pInfo->pTable);
H
Hongze Cheng 已提交
3255 3256 3257
      }
    }

H
Haojun Liao 已提交
3258
    // more than one table in each group, only one table left for each group
H
Haojun Liao 已提交
3259 3260
    if (keyInfo.pTable != NULL) {
      totalNumOfTable++;
3261 3262 3263 3264 3265 3266 3267
      if (taosArrayGetSize(pGroup) == 1) {
        // do nothing
      } else {
        taosArrayClear(pGroup);
        taosArrayPush(pGroup, &keyInfo);
      }
    } else {  // mark all the empty groups, and remove it later
3268
      taosArrayDestroy(pGroup);
3269
      taosArrayPush(emptyGroup, &j);
3270 3271
    }
  }
H
Haojun Liao 已提交
3272

H
Haojun Liao 已提交
3273 3274 3275
  // window does not being updated, so set the original
  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
    window = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3276
    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
H
Haojun Liao 已提交
3277 3278
  }

H
Haojun Liao 已提交
3279
  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t) taosArrayGetSize(emptyGroup));
3280 3281
  taosArrayDestroy(emptyGroup);

H
Haojun Liao 已提交
3282
  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3283
  return window;
H
hjxilinx 已提交
3284 3285
}

3286 3287
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3288
  SQueryFilePos* cur = &pHandle->cur;
3289 3290

  uint64_t uid = 0;
H
Haojun Liao 已提交
3291

3292
  // there are data in file
D
dapan1121 已提交
3293
  if (pHandle->cur.fid != INT32_MIN) {
3294
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3295
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3296
  } else {
3297
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3298
    uid = pCheckInfo->tableId;
3299
  }
3300

3301 3302
  pDataBlockInfo->uid    = uid;
  pDataBlockInfo->rows   = cur->rows;
H
Haojun Liao 已提交
3303
  pDataBlockInfo->window = cur->win;
S
TD-1057  
Shengliang Guan 已提交
3304
  pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
3305
}
H
hjxilinx 已提交
3306

H
Haojun Liao 已提交
3307 3308 3309
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3310 3311
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
  STsdbReadHandle* pHandle = (STsdbReadHandle*) pTsdbReadHandle;
H
Haojun Liao 已提交
3312

H
Haojun Liao 已提交
3313 3314
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3315 3316 3317
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3318

H
Haojun Liao 已提交
3319 3320 3321 3322
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3323 3324 3325 3326
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3327 3328

  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3329 3330 3331
  if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) {
    return terrno;
  }
H
Haojun Liao 已提交
3332

H
Haojun Liao 已提交
3333 3334
  int16_t* colIds = pHandle->defaultLoadColumn->pData;

H
Haojun Liao 已提交
3335
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
H
Haojun Liao 已提交
3336
  memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
3337
  for(int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3338
    pHandle->statis[i].colId = colIds[i];
3339
  }
H
Haojun Liao 已提交
3340

H
Hongze Cheng 已提交
3341
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
H
Haojun Liao 已提交
3342 3343 3344

  // always load the first primary timestamp column data
  SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
3345
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3346 3347 3348 3349 3350

  pPrimaryColStatis->numOfNull = 0;
  pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
  pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;

H
Haojun Liao 已提交
3351
  //update the number of NULL data rows
H
Haojun Liao 已提交
3352
  for(int32_t i = 1; i < numOfCols; ++i) {
3353
    if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
H
Haojun Liao 已提交
3354 3355 3356
      pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
    }
  }
H
Haojun Liao 已提交
3357 3358 3359 3360

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

H
Haojun Liao 已提交
3361
  *pBlockStatis = pHandle->statis;
3362
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3363 3364
}

3365
SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3366
  /**
H
hjxilinx 已提交
3367
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3368 3369
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3370
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3371

D
dapan1121 已提交
3372
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3373 3374
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3375 3376
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3377

3378
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3379 3380
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3381
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3382
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3383

H
hjxilinx 已提交
3384 3385
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3386

H
Hongze Cheng 已提交
3387
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3388
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3389
        return pHandle->pColumns;
H
Haojun Liao 已提交
3390
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3391
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3392
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3393 3394
          return NULL;
        }
H
Haojun Liao 已提交
3395

H
Haojun Liao 已提交
3396
        // todo refactor
H
Haojun Liao 已提交
3397
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
Haojun Liao 已提交
3398

H
Haojun Liao 已提交
3399
        // if the buffer is not full in case of descending order query, move the data in the front of the buffer
3400
        if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
H
Haojun Liao 已提交
3401
          int32_t emptySize = pHandle->outputCapacity - numOfRows;
S
TD-1057  
Shengliang Guan 已提交
3402
          int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
H
Haojun Liao 已提交
3403

H
Haojun Liao 已提交
3404 3405
          for(int32_t i = 0; i < reqNumOfCols; ++i) {
            SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
S
TD-1057  
Shengliang Guan 已提交
3406
            memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
H
Haojun Liao 已提交
3407 3408
          }
        }
H
Haojun Liao 已提交
3409

H
hjxilinx 已提交
3410 3411
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3412 3413
    }
  }
H
hjxilinx 已提交
3414
}
3415
#if 0
3416
void filterPrepare(void* expr, void* param) {
3417
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
3418
  if (pExpr->_node.info != NULL) {
3419 3420
    return;
  }
3421

H
[td-32]  
hjxilinx 已提交
3422
  pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
H
Haojun Liao 已提交
3423

3424
  STSchema*   pTSSchema = (STSchema*) param;
H
hjxilinx 已提交
3425 3426 3427
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
3428

3429 3430
  pInfo->sch      = *pSchema;
  pInfo->optr     = pExpr->_node.optr;
Y
yihaoDeng 已提交
3431
  pInfo->compare  = getComparFunc(pInfo->sch.type, pInfo->optr);
H
Haojun Liao 已提交
3432
  pInfo->indexed  = pTSSchema->columns->colId == pInfo->sch.colId;
H
Haojun Liao 已提交
3433

H
hjxilinx 已提交
3434
  if (pInfo->optr == TSDB_RELATION_IN) {
Y
yihaoDeng 已提交
3435
     int dummy = -1;
3436
     SHashObj *pObj = NULL;
Y
yihaoDeng 已提交
3437 3438 3439 3440
     if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
        pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
        SArray *arr = (SArray *)(pCond->arr);
        for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
Y
yihaoDeng 已提交
3441
          char* p = taosArrayGetP(arr, i);
3442 3443
          strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
          taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
Y
yihaoDeng 已提交
3444 3445 3446 3447
        }
     } else {
       buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
     }
3448
     pInfo->q = (char *)pObj;
H
Haojun Liao 已提交
3449
  } else if (pCond != NULL) {
3450 3451 3452 3453
    uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
    if (size < (uint32_t)pSchema->bytes) {
      size = pSchema->bytes;
    }
3454 3455
    // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
    pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
3456
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
3457
  }
3458 3459
}

3460

H
Haojun Liao 已提交
3461
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
3462
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
H
Haojun Liao 已提交
3463 3464
  STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
  STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
H
Haojun Liao 已提交
3465

3466 3467 3468
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
H
Haojun Liao 已提交
3469

H
Haojun Liao 已提交
3470
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
H
Haojun Liao 已提交
3471

3472 3473 3474 3475
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
H
Haojun Liao 已提交
3476

H
Haojun Liao 已提交
3477 3478 3479
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
3480
      type = TSDB_DATA_TYPE_BINARY;
3481
      bytes = tGetTbnameColumnSchema()->bytes;
3482
    } else {
Y
yihaoDeng 已提交
3483 3484 3485 3486 3487 3488 3489
      if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
        STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
        bytes = pCol->bytes;
        type = pCol->type;
        f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
        f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
      } 
3490
    }
H
Haojun Liao 已提交
3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

3505 3506 3507 3508 3509 3510 3511
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
H
Haojun Liao 已提交
3512

3513 3514
  return 0;
}
3515
#endif
3516

H
Haojun Liao 已提交
3517
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3518
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3519
    return -1;
3520
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3521 3522 3523 3524 3525 3526 3527 3528 3529
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
                          STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
3530
  STable* pTable = taosArrayGetP(pTableList, 0);
H
Haojun Liao 已提交
3531

H
Haojun Liao 已提交
3532 3533 3534 3535
  SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));

  STableKeyInfo info = {.pTable = pTable, .lastKey = skey};
  taosArrayPush(g, &info);
3536

3537
  for (int32_t i = 1; i < numOfTables; ++i) {
3538 3539
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
Haojun Liao 已提交
3540

H
hjxilinx 已提交
3541
    int32_t ret = compareFn(prev, p, pSupp);
3542
    assert(ret == 0 || ret == -1);
H
Haojun Liao 已提交
3543

3544
    if (ret == 0) {
H
Haojun Liao 已提交
3545 3546
      STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
      taosArrayPush(g, &info1);
3547 3548
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
H
Haojun Liao 已提交
3549 3550 3551 3552
      g = taosArrayInit(16, sizeof(STableKeyInfo));

      STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
      taosArrayPush(g, &info1);
3553 3554
    }
  }
H
Haojun Liao 已提交
3555

3556
  taosArrayPush(pGroups, &g);
3557 3558
}

3559
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
3560
  assert(pTableList != NULL);
3561
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3562

3563 3564
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
3565
    tsdbDebug("no qualified tables");
3566 3567
    return pTableGroup;
  }
H
Haojun Liao 已提交
3568

3569
  if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
3570
    SArray* sa = taosArrayDup(pTableList);
H
Haojun Liao 已提交
3571 3572 3573 3574
    if (sa == NULL) {
      taosArrayDestroy(pTableGroup);
      return NULL;
    }
H
Haojun Liao 已提交
3575

3576
    taosArrayPush(pTableGroup, &sa);
S
TD-1057  
Shengliang Guan 已提交
3577
    tsdbDebug("all %" PRIzu " tables belong to one group", size);
3578
  } else {
H
Haojun Liao 已提交
3579 3580
    STableGroupSupporter sup = {0};
    sup.numOfCols = numOfOrderCols;
3581
    sup.pTagSchema = pTagSchema->pSchema;
H
Haojun Liao 已提交
3582 3583
    sup.pCols = pCols;

3584 3585
//    taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
//    createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
3586
  }
H
Haojun Liao 已提交
3587

3588 3589 3590
  return pTableGroup;
}

3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674
//static bool tableFilterFp(const void* pNode, void* param) {
//  tQueryInfo* pInfo = (tQueryInfo*) param;
//
//  STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
//  char* val = NULL;
//  if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
//    val = (char*) TABLE_NAME(pTable);
//  } else {
//    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
//  }
//
//  if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
//    if (pInfo->optr == TSDB_RELATION_ISNULL) {
//      return (val == NULL) || isNull(val, pInfo->sch.type);
//    } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
//      return (val != NULL) && (!isNull(val, pInfo->sch.type));
//    }
//  } else if (pInfo->optr == TSDB_RELATION_IN) {
//     int type = pInfo->sch.type;
//     if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
//       int64_t v;
//       GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
//       uint64_t v;
//       GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     }
//     else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
//       double v;
//       GET_TYPED_DATA(v, double, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
//       return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
//     }
//
//  }
//
//  int32_t ret = 0;
//  if (val == NULL) { //the val is possible to be null, so check it out carefully
//    ret = -1; // val is missing in table tags value pairs
//  } else {
//    ret = pInfo->compare(val, pInfo->q);
//  }
//
//  switch (pInfo->optr) {
//    case TSDB_RELATION_EQUAL: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NOT_EQUAL: {
//      return ret != 0;
//    }
//    case TSDB_RELATION_GREATER_EQUAL: {
//      return ret >= 0;
//    }
//    case TSDB_RELATION_GREATER: {
//      return ret > 0;
//    }
//    case TSDB_RELATION_LESS_EQUAL: {
//      return ret <= 0;
//    }
//    case TSDB_RELATION_LESS: {
//      return ret < 0;
//    }
//    case TSDB_RELATION_LIKE: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_MATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NMATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_IN: {
//      return ret == 1;
//    }
//
//    default:
//      assert(false);
//  }
//
//  return true;
//}
H
Haojun Liao 已提交
3675

3676
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
3677

3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689
//static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
//  // query according to the expression tree
//  SExprTraverseSupp supp = {
//      .nodeFilterFn = (__result_filter_fn_t) tableFilterFp,
//      .setupInfoFn = filterPrepare,
//      .pExtInfo = pSTable->tagSchema,
//      };
//
//  getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
//  tExprTreeDestroy(pExpr, destroyHelper);
//  return TSDB_CODE_SUCCESS;
//}
3690

3691
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
3692
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
3693 3694 3695 3696
                                 SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId) {
  STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
  if (pTbCfg == NULL) {
    tsdbError("%p failed to get stable, uid:%"PRIu64", reqId:0x%"PRIx64, tsdb, uid, reqId);
3697 3698
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3699
  }
H
Haojun Liao 已提交
3700

3701 3702 3703
  if (pTbCfg->type != META_SUPER_TABLE) {
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", reId:0x%"PRIx64, tsdb, uid, reqId);
    terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
3704
    goto _error;
H
hjxilinx 已提交
3705
  }
3706 3707

  //NOTE: not add ref count for super table
H
Haojun Liao 已提交
3708
  SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
3709
  SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
H
Haojun Liao 已提交
3710

weixin_48148422's avatar
weixin_48148422 已提交
3711 3712
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
3713
    int32_t ret = getAllTableList(tsdb->pMeta, uid, res);
3714 3715
    if (ret != TSDB_CODE_SUCCESS) {
      goto _error;
3716
    }
3717

sangshuduo's avatar
sangshuduo 已提交
3718
    pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
H
Haojun Liao 已提交
3719
    pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
H
Haojun Liao 已提交
3720

H
Haojun Liao 已提交
3721
    tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb,
3722
              pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
3723

3724
    taosArrayDestroy(res);
3725 3726
    return ret;
  }
3727

H
hjxilinx 已提交
3728
  int32_t ret = TSDB_CODE_SUCCESS;
3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772
//  tExprNode* expr = NULL;
//
//  TRY(TSDB_MAX_TAG_CONDITIONS) {
//    expr = exprTreeFromTableName(tbnameCond);
//    if (expr == NULL) {
//      expr = exprTreeFromBinary(pTagCond, len);
//    } else {
//      CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
//      tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
//      if (tagExpr != NULL) {
//        CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
//        tExprNode* tbnameExpr = expr;
//        expr = calloc(1, sizeof(tExprNode));
//        if (expr == NULL) {
//          THROW( TSDB_CODE_TDB_OUT_OF_MEMORY );
//        }
//        expr->nodeType = TSQL_NODE_EXPR;
//        expr->_node.optr = (uint8_t)tagNameRelType;
//        expr->_node.pLeft = tagExpr;
//        expr->_node.pRight = tbnameExpr;
//      }
//    }
//    CLEANUP_EXECUTE();
//
//  } CATCH( code ) {
//    CLEANUP_EXECUTE();
//    terrno = code;
//    tsdbUnlockRepoMeta(tsdb);     // unlock tsdb in any cases
//
//    goto _error;
//    // TODO: more error handling
//  } END_TRY
//
//  doQueryTableList(pTable, res, expr);
//  pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
//  pGroupInfo->pGroupList  = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
//  tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
//      pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
//  taosArrayDestroy(res);
//
//  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
//  return ret;
3773 3774 3775

  _error:
  return terrno;
3776
}
3777

3778
#if 0
3779
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
3780 3781
  if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;

3782 3783
  STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
  if (pTable == NULL) {
3784
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
H
Hongze Cheng 已提交
3785
    tsdbUnlockRepoMeta(tsdb);
3786
    goto _error;
3787
  }
3788

B
Bomin Zhang 已提交
3789
  assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
3790 3791
  if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;

3792 3793
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3794

H
Haojun Liao 已提交
3795 3796 3797 3798
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));

  STableKeyInfo info = {.pTable = pTable, .lastKey = startKey};
  taosArrayPush(group, &info);
H
Haojun Liao 已提交
3799

3800
  taosArrayPush(pGroupInfo->pGroupList, &group);
3801
  return TSDB_CODE_SUCCESS;
3802 3803 3804

  _error:
  return terrno;
3805
}
3806

3807
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
3808 3809 3810
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
3811 3812 3813 3814

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3815
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
3816

B
Bomin Zhang 已提交
3817
  for(int32_t i = 0; i < size; ++i) {
3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
D
fix bug  
dapan1121 已提交
3829 3830 3831
      tsdbUnlockRepoMeta(tsdb);
      taosArrayDestroy(group);
      return terrno;
3832 3833
    }

H
Haojun Liao 已提交
3834 3835
    STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
    taosArrayPush(group, &info);
3836 3837
  }

B
Bomin Zhang 已提交
3838 3839 3840 3841
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
3842

sangshuduo's avatar
sangshuduo 已提交
3843
  pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
B
Bomin Zhang 已提交
3844 3845 3846 3847 3848
  if (pGroupInfo->numOfTables > 0) {
    taosArrayPush(pGroupInfo->pGroupList, &group);
  } else {
    taosArrayDestroy(group);
  }
3849 3850 3851

  return TSDB_CODE_SUCCESS;
}
3852
#endif
3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
    tfree(pColInfo->pData);
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

    tfree(p->pCompInfo);
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

3881

3882 3883 3884
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3885 3886
    return;
  }
3887

3888
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3889

3890 3891 3892
  taosArrayDestroy(pTsdbReadHandle->defaultLoadColumn);
  tfree(pTsdbReadHandle->pDataBlockInfo);
  tfree(pTsdbReadHandle->statis);
3893

3894 3895
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3896
  } else {
3897
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3898 3899
  }

3900 3901
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3902
  }
3903

3904
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3905

3906 3907
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3908

3909 3910
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3911

3912
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3913

H
Haojun Liao 已提交
3914 3915
  tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s",
      pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3916

3917
  tfree(pTsdbReadHandle);
3918
}
3919

3920
#if 0
H
Haojun Liao 已提交
3921
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
3922 3923 3924 3925 3926 3927 3928 3929 3930 3931
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
3932 3933 3934 3935
      if (pTable != NULL) { // in case of handling retrieve data from tsdb
        tsdbUnRefTable(pTable);
      }
      //assert(pTable != NULL);
3936 3937 3938 3939 3940
    }

    taosArrayDestroy(p);
  }

3941
  taosHashCleanup(pGroupList->map);
3942
  taosArrayDestroy(pGroupList->pGroupList);
H
Haojun Liao 已提交
3943
  pGroupList->numOfTables = 0;
3944
}
H
Haojun Liao 已提交
3945 3946 3947 3948 3949 3950 3951

static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  // Scan each node in the skiplist by using iterator
  while (tSkipListIterNext(iter)) {
    SSkipListNode *pNode = tSkipListIterGet(iter);
H
Haojun Liao 已提交
3952
    if (exprTreeApplyFilter(pExpr, pNode, param)) {
H
Haojun Liao 已提交
3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975
      taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
    }
  }

  tSkipListDestroyIter(iter);
}

typedef struct {
  char*    v;
  int32_t  optr;
} SEndPoint;

typedef struct {
  SEndPoint* start;
  SEndPoint* end;
} SQueryCond;

// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
  int32_t optr = queryColInfo->optr;

  if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
      optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
Y
yihaoDeng 已提交
3976
    pCond->start       = calloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3977
    pCond->start->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3978
    pCond->start->v    = queryColInfo->q;
H
Haojun Liao 已提交
3979
  } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
Y
yihaoDeng 已提交
3980
    pCond->end       = calloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3981
    pCond->end->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3982 3983 3984 3985 3986 3987
    pCond->end->v    = queryColInfo->q;
  } else if (optr == TSDB_RELATION_IN) {
    pCond->start       = calloc(1, sizeof(SEndPoint));
    pCond->start->optr = queryColInfo->optr;
    pCond->start->v    = queryColInfo->q; 
  } else if (optr == TSDB_RELATION_LIKE) {
H
Haojun Liao 已提交
3988
    assert(0);
3989 3990
  } else if (optr == TSDB_RELATION_MATCH) {
    assert(0);
3991 3992
  } else if (optr == TSDB_RELATION_NMATCH) {
    assert(0);
H
Haojun Liao 已提交
3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075
  }

  return TSDB_CODE_SUCCESS;
}

static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
  SSkipListIterator* iter = NULL;

  SQueryCond cond = {0};
  if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
    //todo handle error
  }

  if (cond.start != NULL) {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
  } else {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
  }

  if (cond.start != NULL) {
    int32_t optr = cond.start->optr;

    if (optr == TSDB_RELATION_EQUAL) {   // equals
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
    } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
      bool comp = true;
      int32_t ret = 0;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
          assert(ret >= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_GREATER) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;
        }
      }
    } else if (optr == TSDB_RELATION_NOT_EQUAL) {   // not equal
      bool comp = true;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

      tSkipListDestroyIter(iter);

      comp = true;
      iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

Y
yihaoDeng 已提交
4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088
    } else if (optr == TSDB_RELATION_IN) {
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
      
H
Haojun Liao 已提交
4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149
    } else {
      assert(0);
    }
  } else {
    int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
    if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
      bool    comp = true;
      int32_t ret = 0;

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
          assert(ret <= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_LESS) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;  // no need to compare anymore
        }
      }
    } else {
      assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
        if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
            (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
        }
      }
    }
  }

  free(cond.start);
  free(cond.end);
  tSkipListDestroyIter(iter);
}

static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  while (tSkipListIterNext(iter)) {
    bool addToResult = false;

    SSkipListNode *pNode = tSkipListIterGet(iter);

    char *pData = SL_GET_NODE_DATA(pNode);
    tstr *name = (tstr*) tsdbGetTableName((void*) pData);

    // todo speed up by using hash
    if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
      if (pQueryInfo->optr == TSDB_RELATION_IN) {
        addToResult = pQueryInfo->compare(name, pQueryInfo->q);
4150 4151 4152
      } else if (pQueryInfo->optr == TSDB_RELATION_LIKE ||
                 pQueryInfo->optr == TSDB_RELATION_MATCH ||
                 pQueryInfo->optr == TSDB_RELATION_NMATCH) {
H
Haojun Liao 已提交
4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183
        addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
      }
    } else {
      addToResult = filterFp(pNode, pQueryInfo);
    }

    if (addToResult) {
      STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
      taosArrayPush(res, &info);
    }
  }

  tSkipListDestroyIter(iter);
}

// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
  if (pExpr == NULL) {
    return;
  }

  tExprNode *pLeft  = pExpr->_node.pLeft;
  tExprNode *pRight = pExpr->_node.pRight;

  // column project
  if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
    assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));

    param->setupInfoFn(pExpr, param->pExtInfo);

    tQueryInfo *pQueryInfo = pExpr->_node.info;
4184 4185
    if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
                                && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
4186
                                && pQueryInfo->optr != TSDB_RELATION_IN)) {
H
Haojun Liao 已提交
4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201
      queryIndexedColumn(pSkipList, pQueryInfo, result);
    } else {
      queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
    }

    return;
  }

  // The value of hasPK is always 0.
  uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
  assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);

  //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
  applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
4202
#endif