tsdbRWHelper.c 43.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15 16

#include "os.h"
H
hzcheng 已提交
17
#include "tsdbMain.h"
H
TD-100  
hzcheng 已提交
18
#include "tchecksum.h"
H
TD-100  
hzcheng 已提交
19
#include "tscompression.h"
H
TD-100  
hzcheng 已提交
20
#include "talgo.h"
H
hzcheng 已提交
21 22

// Local function definitions
H
TD-100  
hzcheng 已提交
23
// static int  tsdbCheckHelperCfg(SHelperCfg *pCfg);
H
TD-100  
hzcheng 已提交
24
static int  tsdbInitHelperFile(SRWHelper *pHelper);
H
TD-100  
hzcheng 已提交
25
// static void tsdbClearHelperFile(SHelperFile *pHFile);
H
hzcheng 已提交
26
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
H
TD-100  
hzcheng 已提交
27 28
static int  tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                 SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
H
hzcheng 已提交
29 30
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
H
TD-100  
hzcheng 已提交
31
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
H
TD-100  
hzcheng 已提交
32 33
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
H
TD-100  
hzcheng 已提交
34
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
H
TD-100  
hzcheng 已提交
35
static void tsdbResetHelperBlock(SRWHelper *pHelper);
H
hzcheng 已提交
36

H
TD-100  
hzcheng 已提交
37 38 39 40 41 42 43 44 45 46 47 48
// ---------- Operations on Helper File part
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
  pHelper->files.fid = -1;
  pHelper->files.headF.fd = -1;
  pHelper->files.dataF.fd = -1;
  pHelper->files.lastF.fd = -1;
  pHelper->files.nHeadF.fd = -1;
  pHelper->files.nLastF.fd = -1;
}

static int tsdbInitHelperFile(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
49 50 51
  // pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
  size_t tsize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
  pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
H
TD-100  
hzcheng 已提交
52 53 54 55 56 57 58 59
  if (pHelper->pCompIdx == NULL) return -1;

  tsdbResetHelperFileImpl(pHelper);
  return 0;
}

static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
  tsdbCloseHelperFile(pHelper, false);
H
TD-100  
hzcheng 已提交
60
  tzfree(pHelper->pCompIdx);
H
TD-100  
hzcheng 已提交
61 62 63 64 65 66 67 68
}

// ---------- Operations on Helper Table part
static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable));
  pHelper->hasOldLastBlock = false;
}

H
TD-100  
hzcheng 已提交
69 70 71
static void tsdbResetHelperTable(SRWHelper *pHelper) {
  tsdbResetHelperBlock(pHelper);
  tsdbResetHelperTableImpl(pHelper);
H
TD-100  
hzcheng 已提交
72
  helperClearState(pHelper, (TSDB_HELPER_TABLE_SET|TSDB_HELPER_INFO_LOAD));
H
TD-100  
hzcheng 已提交
73 74
}

H
TD-100  
hzcheng 已提交
75 76 77 78
static void tsdbInitHelperTable(SRWHelper *pHelper) {
  tsdbResetHelperTableImpl(pHelper);
}

H
TD-100  
hzcheng 已提交
79
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); }
H
TD-100  
hzcheng 已提交
80 81 82 83 84 85 86

// ---------- Operations on Helper Block part
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
  tdResetDataCols(pHelper->pDataCols[0]);
  tdResetDataCols(pHelper->pDataCols[1]);
}

H
TD-100  
hzcheng 已提交
87
static void tsdbResetHelperBlock(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
88 89
  tsdbResetHelperBlockImpl(pHelper);
  // helperClearState(pHelper, TSDB_HELPER_)
H
TD-100  
hzcheng 已提交
90 91
}

H
TD-100  
hzcheng 已提交
92
static int tsdbInitHelperBlock(SRWHelper *pHelper) {
H
TD-166  
hzcheng 已提交
93 94
  pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES);
  pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows, sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES);
H
TD-100  
hzcheng 已提交
95 96 97 98 99 100 101 102
  if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1;

  tsdbResetHelperBlockImpl(pHelper);

  return 0;
}

static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
103
  tzfree(pHelper->pCompData);
H
TD-100  
hzcheng 已提交
104 105 106 107
  tdFreeDataCols(pHelper->pDataCols[0]);
  tdFreeDataCols(pHelper->pDataCols[1]);
}

H
TD-100  
hzcheng 已提交
108 109
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
  if (pHelper == NULL || pRepo == NULL) return -1;
H
hzcheng 已提交
110 111 112

  memset((void *)pHelper, 0, sizeof(*pHelper));

H
TD-100  
hzcheng 已提交
113
  // Init global configuration
H
TD-100  
hzcheng 已提交
114 115 116 117 118 119 120 121 122
  pHelper->config.type = type;
  pHelper->config.maxTables = pRepo->config.maxTables;
  pHelper->config.maxRowSize = pRepo->tsdbMeta->maxRowBytes;
  pHelper->config.maxRows = pRepo->config.maxRowsPerFileBlock;
  pHelper->config.maxCols = pRepo->tsdbMeta->maxCols;
  pHelper->config.minRowsPerFileBlock = pRepo->config.minRowsPerFileBlock;
  pHelper->config.maxRowsPerFileBlock = pRepo->config.maxRowsPerFileBlock;
  pHelper->config.compress = pRepo->config.compression;

H
TD-100  
hzcheng 已提交
123
  pHelper->state = TSDB_HELPER_CLEAR_STATE;
H
hzcheng 已提交
124

H
TD-100  
hzcheng 已提交
125 126
  // Init file part
  if (tsdbInitHelperFile(pHelper) < 0) goto _err;
H
hzcheng 已提交
127

H
TD-100  
hzcheng 已提交
128 129
  // Init table part
  tsdbInitHelperTable(pHelper);
H
hzcheng 已提交
130

H
TD-100  
hzcheng 已提交
131 132
  // Init block part
  if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
H
hzcheng 已提交
133 134 135 136 137 138 139 140

  return 0;

_err:
  tsdbDestroyHelper(pHelper);
  return -1;
}

H
TD-100  
hzcheng 已提交
141 142 143 144 145 146 147 148 149
// ------------------------------------------ OPERATIONS FOR OUTSIDE ------------------------------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
}

int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER);
}

H
hzcheng 已提交
150
void tsdbDestroyHelper(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
151 152 153 154 155 156
  if (pHelper) {
    tsdbDestroyHelperFile(pHelper);
    tsdbDestroyHelperTable(pHelper);
    tsdbDestroyHelperBlock(pHelper);
    memset((void *)pHelper, 0, sizeof(*pHelper));
  }
H
hzcheng 已提交
157 158
}

H
TD-100  
hzcheng 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172
void tsdbResetHelper(SRWHelper *pHelper) {
  if (pHelper) {
    // Reset the block part
    tsdbResetHelperBlockImpl(pHelper);

    // Reset the table part
    tsdbResetHelperTableImpl(pHelper);

    // Reset the file part
    tsdbCloseHelperFile(pHelper, false);
    tsdbResetHelperFileImpl(pHelper);

    pHelper->state = TSDB_HELPER_CLEAR_STATE;
  }
H
hzcheng 已提交
173 174
}

H
TD-100  
hzcheng 已提交
175
// ------------ Operations for read/write purpose
H
TD-100  
hzcheng 已提交
176 177
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
  ASSERT(pHelper != NULL && pGroup != NULL);
H
hzcheng 已提交
178

H
TD-100  
hzcheng 已提交
179
  // Clear the helper object
H
TD-100  
hzcheng 已提交
180
  tsdbResetHelper(pHelper);
H
TD-100  
hzcheng 已提交
181 182

  ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
H
hzcheng 已提交
183

H
TD-100  
hzcheng 已提交
184 185
  // Set the files
  pHelper->files.fid = pGroup->fileId;
H
hzcheng 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198
  pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
  pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
  pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
  if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
    char *fnameDup = strdup(pHelper->files.headF.fname);
    if (fnameDup == NULL) return -1;
    char *dataDir = dirname(fnameDup);

    tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
    tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
    free((void *)fnameDup);
  }

H
TD-100  
hzcheng 已提交
199 200
  // Open the files
  if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
H
hzcheng 已提交
201 202 203
  if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
H
TD-100  
hzcheng 已提交
204 205

    // Create and open .h
H
TD-100  
hzcheng 已提交
206 207
    if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
    size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
H
TD-100  
hzcheng 已提交
208 209 210
    if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err;

    // Create and open .l file if should
H
hzcheng 已提交
211 212
    if (tsdbShouldCreateNewLast(pHelper)) {
      if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
H
TD-100  
hzcheng 已提交
213
      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err;
H
hzcheng 已提交
214 215 216 217 218 219
    }
  } else {
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
  }

H
TD-100  
hzcheng 已提交
220
  helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
221

H
TD-100  
hzcheng 已提交
222 223 224
  return tsdbLoadCompIdx(pHelper, NULL);

  _err:
H
hzcheng 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
  return -1;
}

int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
  if (pHelper->files.headF.fd > 0) {
    close(pHelper->files.headF.fd);
    pHelper->files.headF.fd = -1;
  }
  if (pHelper->files.dataF.fd > 0) {
    close(pHelper->files.dataF.fd);
    pHelper->files.dataF.fd = -1;
  }
  if (pHelper->files.lastF.fd > 0) {
    close(pHelper->files.lastF.fd);
    pHelper->files.lastF.fd = -1;
  }
  if (pHelper->files.nHeadF.fd > 0) {
    close(pHelper->files.nHeadF.fd);
    pHelper->files.nHeadF.fd = -1;
H
TD-100  
hzcheng 已提交
244 245 246 247 248 249
    if (hasError) {
      remove(pHelper->files.nHeadF.fname);
    } else {
      rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname);
      pHelper->files.headF.info = pHelper->files.nHeadF.info;
    }
H
hzcheng 已提交
250 251 252 253 254
  }
  
  if (pHelper->files.nLastF.fd > 0) {
    close(pHelper->files.nLastF.fd);
    pHelper->files.nLastF.fd = -1;
H
TD-100  
hzcheng 已提交
255 256 257 258 259 260
    if (hasError) {
      remove(pHelper->files.nLastF.fname);
    } else {
      rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname);
      pHelper->files.lastF.info = pHelper->files.nLastF.info;
    }
H
hzcheng 已提交
261 262 263 264
  }
  return 0;
}

H
TD-100  
hzcheng 已提交
265
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
H
TD-100  
hzcheng 已提交
266
  ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
267 268

  // Clear members and state used by previous table
H
TD-100  
hzcheng 已提交
269
  tsdbResetHelperTable(pHelper);
H
TD-100  
hzcheng 已提交
270
  ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)));
H
hzcheng 已提交
271

H
TD-100  
hzcheng 已提交
272 273 274 275 276
  pHelper->tableInfo.tid = pTable->tableId.tid;
  pHelper->tableInfo.uid = pTable->tableId.uid;
  pHelper->tableInfo.sversion = pTable->sversion;
  STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable);

H
TD-100  
hzcheng 已提交
277 278 279
  tdInitDataCols(pHelper->pDataCols[0], pSchema);
  tdInitDataCols(pHelper->pDataCols[1], pSchema);

H
TD-100  
hzcheng 已提交
280
  SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
H
TD-100  
hzcheng 已提交
281 282 283 284
  if (pIdx->offset > 0 && pIdx->hasLast) {
    pHelper->hasOldLastBlock = true;
  }

H
TD-100  
hzcheng 已提交
285
  helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
H
TD-100  
hzcheng 已提交
286
  ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
H
hzcheng 已提交
287 288
}

H
TD-100  
hzcheng 已提交
289 290 291 292 293 294
/**
 * Write part of of points from pDataCols to file
 * 
 * @return: number of points written to file successfully
 *          -1 for failure
 */
H
hzcheng 已提交
295 296
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-100  
hzcheng 已提交
297
  ASSERT(pDataCols->numOfPoints > 0);
H
TD-100  
hzcheng 已提交
298

H
hzcheng 已提交
299 300 301 302
  SCompBlock compBlock;
  int        rowsToWrite = 0;
  TSKEY      keyFirst = dataColsKeyFirst(pDataCols);

H
TD-100  
hzcheng 已提交
303
  ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
304
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;  // for change purpose
H
hzcheng 已提交
305 306

  // Load the SCompInfo part if neccessary
H
TD-100  
hzcheng 已提交
307
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
H
TD-100  
hzcheng 已提交
308
  if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
H
hzcheng 已提交
309

H
TD-100  
hzcheng 已提交
310
  if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) {  // Just append as a super block
H
TD-100  
hzcheng 已提交
311
    ASSERT(pHelper->hasOldLastBlock == false);
H
hzcheng 已提交
312 313 314 315
    rowsToWrite = pDataCols->numOfPoints;
    SFile *pWFile = NULL;
    bool   isLast = false;

H
TD-100  
hzcheng 已提交
316
    if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) {
H
hzcheng 已提交
317 318 319
      pWFile = &(pHelper->files.dataF);
    } else {
      isLast = true;
H
TD-100  
hzcheng 已提交
320
      pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
321 322
    }

H
TD-100  
hzcheng 已提交
323
    if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
H
hzcheng 已提交
324

H
hzcheng 已提交
325
    if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
H
TD-100  
hzcheng 已提交
326 327
  } else {  // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
    SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks),
H
hzcheng 已提交
328
                                         pIdx->numOfBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE);
H
TD-100  
hzcheng 已提交
329

H
hzcheng 已提交
330
    int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
H
TD-100  
hzcheng 已提交
331 332

    if (pCompBlock == NULL) {  // No key overlap, must has last block, just merge with the last block
H
TD-166  
hzcheng 已提交
333
      ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
H
TD-100  
hzcheng 已提交
334 335 336 337
      rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
      if (rowsToWrite < 0) goto _err;
    } else {  // Has key overlap

H
TD-100  
hzcheng 已提交
338 339
      if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
        // Key overlap with the block, must merge with the block
H
TD-100  
hzcheng 已提交
340 341 342

        rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
        if (rowsToWrite < 0) goto _err;
H
TD-100  
hzcheng 已提交
343
      } else { // Save as a super block in the middle
H
TD-100  
hzcheng 已提交
344
        rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1);
H
TD-100  
hzcheng 已提交
345 346
        ASSERT(rowsToWrite > 0);
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err;
H
TD-100  
hzcheng 已提交
347
        if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
348 349 350 351 352 353
      }
    }
  }

  return rowsToWrite;

H
TD-100  
hzcheng 已提交
354
_err:
H
hzcheng 已提交
355 356 357 358
  return -1;
}

int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
359
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-100  
hzcheng 已提交
360 361
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
  SCompBlock compBlock;
H
TD-100  
hzcheng 已提交
362
  if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
H
TD-100  
hzcheng 已提交
363 364
    if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;

H
hzcheng 已提交
365
    SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
H
TD-100  
hzcheng 已提交
366 367 368
    ASSERT(pCompBlock->last);

    if (pCompBlock->numOfSubBlocks > 1) {
H
hzcheng 已提交
369
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
H
TD-100  
hzcheng 已提交
370 371
      ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 &&
             pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock);
H
TD-100  
hzcheng 已提交
372 373 374 375
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
                               pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0)
        return -1;

H
hzcheng 已提交
376
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
H
TD-100  
hzcheng 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389

    } else {
      if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
      pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
      if (pCompBlock->offset < 0) return -1;

      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
        return -1;
    }

    pHelper->hasOldLastBlock = false;
  }

H
hzcheng 已提交
390 391 392 393
  return 0;
}

int tsdbWriteCompInfo(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
394 395 396 397 398
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
    if (pIdx->offset > 0) {
      pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
      if (pIdx->offset < 0) return -1;
H
TD-100  
hzcheng 已提交
399
      ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx));
H
TD-100  
hzcheng 已提交
400 401 402 403

      if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
    }
  } else {
H
TD-100  
hzcheng 已提交
404 405
    pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
    pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
H
hzcheng 已提交
406
    pHelper->pCompInfo->checksum = 0;
H
TD-100  
hzcheng 已提交
407
    ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
H
TD-100  
hzcheng 已提交
408
    taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
H
TD-100  
hzcheng 已提交
409 410
    pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
    if (pIdx->offset < 0) return -1;
H
TD-100  
hzcheng 已提交
411
    ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx));
H
TD-100  
hzcheng 已提交
412 413 414 415

    if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
  }

H
hzcheng 已提交
416 417 418 419
  return 0;
}

int tsdbWriteCompIdx(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
420
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-100  
hzcheng 已提交
421 422
  if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;

H
TD-100  
hzcheng 已提交
423
  ASSERT(tsizeof(pHelper->pCompIdx) == sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM));
H
TD-100  
hzcheng 已提交
424 425
  taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));

H
TD-100  
hzcheng 已提交
426
  if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx))
H
TD-100  
hzcheng 已提交
427
    return -1;
H
hzcheng 已提交
428 429 430 431
  return 0;
}

int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
432
  ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
433

H
TD-100  
hzcheng 已提交
434 435 436
  if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
    // If not load from file, just load it in object
    int fd = pHelper->files.headF.fd;
H
hzcheng 已提交
437

H
TD-100  
hzcheng 已提交
438
    if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
H
TD-100  
hzcheng 已提交
439 440 441 442 443
    if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1;
    if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx))) {
      // TODO: File is broken, try to deal with it
      return -1;
    }
H
TD-100  
hzcheng 已提交
444 445
  }
  helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
H
hzcheng 已提交
446

H
TD-100  
hzcheng 已提交
447
  // Copy the memory for outside usage
H
TD-100  
hzcheng 已提交
448
  if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
H
hzcheng 已提交
449 450 451 452 453

  return 0;
}

int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
454 455
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));

H
TD-100  
hzcheng 已提交
456 457
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
hzcheng 已提交
458 459
  int fd = pHelper->files.headF.fd;

H
TD-100  
hzcheng 已提交
460
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
H
TD-100  
hzcheng 已提交
461 462
    if (pIdx->offset > 0) {
      if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
H
hzcheng 已提交
463

H
TD-100  
hzcheng 已提交
464 465 466 467
      pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
      if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
      if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
    }
H
hzcheng 已提交
468

H
TD-100  
hzcheng 已提交
469 470
    helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
  }
H
hzcheng 已提交
471

H
TD-100  
hzcheng 已提交
472
  if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
H
hzcheng 已提交
473 474 475 476

  return 0;
}

H
TD-100  
hzcheng 已提交
477 478 479 480 481 482
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;

  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;

H
TD-100  
hzcheng 已提交
483 484 485
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
  pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
  if (pHelper->pCompData == NULL) return -1;
H
TD-100  
hzcheng 已提交
486 487 488 489 490 491
  if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;

  ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);

  if (target) memcpy(target, pHelper->pCompData, tsize);

H
hzcheng 已提交
492 493 494
  return 0;
}

H
TD-100  
hzcheng 已提交
495 496 497 498 499 500 501 502 503 504
static int comparColIdCompCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}

static int comparColIdDataCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}

static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
H
TD-100  
hzcheng 已提交
505
  if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
H
TD-100  
hzcheng 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
  if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;

  return 0;
}

static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                       SDataCols *pDataCols) {
  if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;

  void *ptr = NULL;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t colId = colIds[i];

    ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol), comparColIdCompCol);
    if (ptr == NULL) continue;
    SCompCol *pCompCol = (SCompCol *)ptr;

    ptr = bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
    ASSERT(ptr != NULL);
    SDataCol *pDataCol = (SDataCol *)ptr;

    pDataCol->len = pCompCol->len;
    if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
  }

  return 0;
}

// Load specific column data from file
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
  SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;

  ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block

H
TD-100  
hzcheng 已提交
541 542 543
  int numOfSubBlocks = pCompBlock->numOfSubBlocks;
  SCompBlock *pStartBlock =
      (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
544

H
TD-100  
hzcheng 已提交
545 546 547 548 549 550
  if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
  for (int i = 1; i < numOfSubBlocks; i++) {
    pStartBlock++;
    if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
    tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints);
  }
H
TD-100  
hzcheng 已提交
551

H
hzcheng 已提交
552 553 554
  return 0;
}

H
TD-166  
hzcheng 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints,
                                        int maxPoints, char *buffer, int bufferSize) {
  // Verify by checksum
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;

  // Decode the data
  if (comp) {
    // Need to decompress
    void *pStart = NULL;
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      pStart = (char *)(pDataCol->pData) + sizeof(int32_t) * maxPoints;
    }
    // TODO: get rid of INT32_MAX here
    pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfPoints, pStart,
                                                                  INT32_MAX, comp, buffer, bufferSize);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      pDataCol->len += (sizeof(int32_t) * maxPoints);
      dataColSetOffset(pDataCol, numOfPoints, maxPoints);
    }
  } else {
    // No need to decompress, just memcpy it
    switch (pDataCol->type) {
      case TSDB_DATA_TYPE_BINARY:
      case TSDB_DATA_TYPE_NCHAR:
        pDataCol->len = sizeof(int32_t) * maxPoints;
        memcpy((char *)pDataCol->pData + pDataCol->len, content, len - sizeof(TSCKSUM));
        pDataCol->len += (len - sizeof(TSCKSUM));
        dataColSetOffset(pDataCol, numOfPoints, maxPoints);
        break;

      default:
        pDataCol->len = len - sizeof(TSCKSUM);
        memcpy(pDataCol->pData, content, pDataCol->len);
        break;
    }
  }
  return 0;
}

H
TD-100  
hzcheng 已提交
594 595 596 597 598 599
/**
 * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1)
 */
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);

H
TD-166  
hzcheng 已提交
600 601 602 603
  pHelper->blockBuffer = trealloc(pHelper->blockBuffer, pCompBlock->len);
  if (pHelper->blockBuffer == NULL) return -1;

  SCompData *pCompData = (SCompData *)pHelper->blockBuffer;
H
TD-100  
hzcheng 已提交
604 605

  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
H
TD-100  
hzcheng 已提交
606
  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
H
TD-100  
hzcheng 已提交
607
  if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err;
H
TD-100  
hzcheng 已提交
608
  ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
H
TD-100  
hzcheng 已提交
609

H
TD-166  
hzcheng 已提交
610
  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
H
TD-100  
hzcheng 已提交
611
  if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err;
H
TD-100  
hzcheng 已提交
612 613 614

  pDataCols->numOfPoints = pCompBlock->numOfPoints;

H
TD-166  
hzcheng 已提交
615 616 617 618 619 620 621 622 623 624
  // Recover the data
  int ccol = 0;
  int dcol = 0;
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (ccol >= pCompData->numOfCols) {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
      dcol++;
      continue;
H
TD-100  
hzcheng 已提交
625 626 627 628 629
    }

    SCompCol *pCompCol = &(pCompData->cols[ccol]);

    if (pCompCol->colId == pDataCol->colId) {
H
TD-166  
hzcheng 已提交
630 631 632 633
      if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
                                       pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, pHelper->compBuffer,
                                       tsizeof(pHelper->compBuffer)) < 0)
        goto _err;
H
TD-100  
hzcheng 已提交
634 635
      dcol++;
      ccol++;
H
TD-166  
hzcheng 已提交
636 637 638 639 640 641
    } else if (pCompCol->colId < pDataCol->colId) {
      ccol++;
    } else {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints);
      dcol++;
H
TD-100  
hzcheng 已提交
642 643 644 645 646 647 648 649 650
    }
  }

  return 0;

_err:
  return -1;
}

H
TD-100  
hzcheng 已提交
651
// Load the whole block data
H
TD-100  
hzcheng 已提交
652 653
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
H
TD-100  
hzcheng 已提交
654

H
TD-100  
hzcheng 已提交
655 656
  int numOfSubBlock = pCompBlock->numOfSubBlocks;
  if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
657

H
TD-100  
hzcheng 已提交
658
  tdResetDataCols(pHelper->pDataCols[0]);
H
TD-100  
hzcheng 已提交
659 660
  if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
  for (int i = 1; i < numOfSubBlock; i++) {
H
TD-100  
hzcheng 已提交
661
    tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
662 663 664 665
    pCompBlock++;
    if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
    if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err;
  }
H
TD-100  
hzcheng 已提交
666

H
TD-100  
hzcheng 已提交
667 668
  // if (target) TODO

H
hzcheng 已提交
669
  return 0;
H
TD-100  
hzcheng 已提交
670 671 672

_err:
  return -1;
H
hzcheng 已提交
673 674 675
}

static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
676 677 678 679 680
  ASSERT(pHelper->files.lastF.fd > 0);
  struct stat st;
  fstat(pHelper->files.lastF.fd, &st);
  if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
  return false;
H
hzcheng 已提交
681 682 683
}

static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
H
TD-100  
hzcheng 已提交
684 685 686 687
                                bool isLast, bool isSuperBlock) {
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
         rowsToWrite <= pHelper->config.maxRowsPerFileBlock);

H
TD-166  
hzcheng 已提交
688
  SCompData *pCompData = (SCompData *)(pHelper->blockBuffer);
H
TD-100  
hzcheng 已提交
689
  int64_t offset = 0;
H
hzcheng 已提交
690

H
TD-100  
hzcheng 已提交
691
  offset = lseek(pFile->fd, 0, SEEK_END);
H
hzcheng 已提交
692 693 694 695 696 697 698
  if (offset < 0) goto _err;

  int nColsNotAllNull = 0;
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
    SDataCol *pDataCol = pDataCols->cols + ncol;
    SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;

H
TD-166  
hzcheng 已提交
699 700
    if (isNEleNull(pDataCol, rowsToWrite)) {
      // all data to commit are NULL, just ignore it
H
hzcheng 已提交
701 702 703 704 705 706 707 708
      continue;
    }

    pCompCol->colId = pDataCol->colId;
    pCompCol->type = pDataCol->type;
    nColsNotAllNull++;
  }

H
TD-100  
hzcheng 已提交
709
  ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
H
TD-100  
hzcheng 已提交
710

H
TD-166  
hzcheng 已提交
711 712 713 714 715
  // Compress the data if neccessary
  int     tcol = 0;
  int32_t toffset = 0;
  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
  int32_t lsize = tsize;
H
TD-100  
hzcheng 已提交
716
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
H
TD-166  
hzcheng 已提交
717
    if (tcol >= nColsNotAllNull) break;
H
TD-100  
hzcheng 已提交
718 719

    SDataCol *pDataCol = pDataCols->cols + ncol;
H
TD-166  
hzcheng 已提交
720 721 722 723 724 725 726 727 728
    SCompCol *pCompCol = pCompData->cols + tcol;

    if (pDataCol->colId != pCompCol->colId) continue;
    void *tptr = (void *)((char *)pCompData + lsize);

    pCompCol->offset = toffset;

    void *pStart = NULL;
    int32_t tlen = 0;
H
TD-100  
hzcheng 已提交
729

H
TD-166  
hzcheng 已提交
730 731 732 733 734 735 736 737 738 739
    dataColGetNEleStartAndLen(pDataCol, rowsToWrite, &pStart, &tlen, pDataCols->maxPoints);

    // TODO: compresee the data
    if (pHelper->config.compress) {
      pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
          (char *)pStart, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, pHelper->config.compress,
          pHelper->compBuffer, tsizeof(pHelper->compBuffer));
    } else {
      pCompCol->len = tlen;
      memcpy(tptr, pStart, pCompCol->len);
H
TD-100  
hzcheng 已提交
740
    }
H
TD-166  
hzcheng 已提交
741 742 743 744 745 746 747 748

    // Add checksum
    pCompCol->len += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);

    toffset += pCompCol->len;
    lsize += pCompCol->len;
    tcol++;
H
hzcheng 已提交
749 750
  }

H
TD-166  
hzcheng 已提交
751 752 753 754 755 756 757 758 759 760
  pCompData->delimiter = TSDB_FILE_DELIMITER;
  pCompData->uid = pHelper->tableInfo.uid;
  pCompData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);

  // Write the whole block to file
  if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) goto _err;

  // Update pCompBlock membership vairables
H
hzcheng 已提交
761 762
  pCompBlock->last = isLast;
  pCompBlock->offset = offset;
H
TD-100  
hzcheng 已提交
763
  pCompBlock->algorithm = pHelper->config.compress;
H
hzcheng 已提交
764 765
  pCompBlock->numOfPoints = rowsToWrite;
  pCompBlock->sversion = pHelper->tableInfo.sversion;
H
TD-166  
hzcheng 已提交
766
  pCompBlock->len = (int32_t)lsize;
H
TD-100  
hzcheng 已提交
767
  pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
H
hzcheng 已提交
768
  pCompBlock->numOfCols = nColsNotAllNull;
H
TD-100  
hzcheng 已提交
769 770
  pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
  pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
H
hzcheng 已提交
771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790

  return 0;

  _err:
  return -1;
}

static int compareKeyBlock(const void *arg1, const void *arg2) {
  TSKEY       key = *(TSKEY *)arg1;
  SCompBlock *pBlock = (SCompBlock *)arg2;

  if (key < pBlock->keyFirst) {
    return -1;
  } else if (key > pBlock->keyLast) {
    return 1;
  }

  return 0;
}

H
hzcheng 已提交
791 792 793
// static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
//   return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2));
// }
H
TD-100  
hzcheng 已提交
794

H
TD-100  
hzcheng 已提交
795
// Merge the data with a block in file
H
hzcheng 已提交
796
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
H
TD-100  
hzcheng 已提交
797
  // TODO: set pHelper->hasOldBlock
H
hzcheng 已提交
798 799 800
  int        rowsWritten = 0;
  SCompBlock compBlock = {0};

H
TD-100  
hzcheng 已提交
801
  ASSERT(pDataCols->numOfPoints > 0);
H
TD-100  
hzcheng 已提交
802 803
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);

H
hzcheng 已提交
804
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
805
  ASSERT(blkIdx < pIdx->numOfBlocks);
H
hzcheng 已提交
806

H
TD-100  
hzcheng 已提交
807 808 809
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
  ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
H
TD-100  
hzcheng 已提交
810
  // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
H
hzcheng 已提交
811

H
TD-100  
hzcheng 已提交
812
  if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
H
TD-166  
hzcheng 已提交
813
    ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
H
TD-100  
hzcheng 已提交
814 815
    int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5;  // TODO: make a interface

H
TD-100  
hzcheng 已提交
816 817 818 819
    rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
    if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
        (blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
H
TD-100  
hzcheng 已提交
820
        goto _err;
H
TD-100  
hzcheng 已提交
821
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
hzcheng 已提交
822
    } else {
H
TD-100  
hzcheng 已提交
823
      // Load
H
TD-100  
hzcheng 已提交
824
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
TD-100  
hzcheng 已提交
825 826 827 828 829 830 831 832
      ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints);
      // Merge
      if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
      // Write
      SFile *pWFile = NULL;
      bool isLast = false;
      if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) {
        pWFile = &(pHelper->files.dataF);
H
hzcheng 已提交
833
      } else {
H
TD-100  
hzcheng 已提交
834 835
        isLast = true;
        pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
836
      }
H
TD-100  
hzcheng 已提交
837 838 839 840
      if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0],
                               pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0)
        goto _err;
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
841
    }
H
TD-100  
hzcheng 已提交
842 843 844

    ASSERT(pHelper->hasOldLastBlock);
    pHelper->hasOldLastBlock = false;
H
TD-100  
hzcheng 已提交
845
  } else {
H
TD-100  
hzcheng 已提交
846 847
    // Key must overlap with the block
    ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
H
TD-100  
hzcheng 已提交
848 849

    TSKEY keyLimit =
H
hzcheng 已提交
850
        (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
H
TD-100  
hzcheng 已提交
851

H
TD-100  
hzcheng 已提交
852 853 854 855 856 857
    // rows1: number of rows must merge in this block
    int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
    // rows2: max nuber of rows the block can have more
    int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints;
    // rows3: number of rows between this block and the next block
    int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
H
hzcheng 已提交
858

H
TD-100  
hzcheng 已提交
859
    ASSERT(rows3 >= rows1);
H
TD-100  
hzcheng 已提交
860

H
TD-100  
hzcheng 已提交
861 862 863
    if ((rows2 >= rows1) &&
        (( blockAtIdx(pHelper, blkIdx)->last) ||
         ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
H
TD-100  
hzcheng 已提交
864 865 866 867
      rowsWritten = rows1;
      bool   isLast = false;
      SFile *pFile = NULL;

H
TD-100  
hzcheng 已提交
868
      if (blockAtIdx(pHelper, blkIdx)->last) {
H
TD-100  
hzcheng 已提交
869 870 871 872 873 874 875 876
        isLast = true;
        pFile = &(pHelper->files.lastF);
      } else {
        pFile = &(pHelper->files.dataF);
      }

      if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
TD-100  
hzcheng 已提交
877 878
    } else { // Load-Merge-Write
      // Load
H
TD-100  
hzcheng 已提交
879
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
TD-100  
hzcheng 已提交
880 881
      if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;

H
TD-100  
hzcheng 已提交
882 883 884 885
      rowsWritten = rows3;

      int iter1 = 0; // iter over pHelper->pDataCols[0]
      int iter2 = 0; // iter over pDataCols
H
TD-100  
hzcheng 已提交
886 887
      int round = 0;
      // tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
888
      while (true) {
H
TD-100  
hzcheng 已提交
889 890 891 892 893 894 895 896
        if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break;
        tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
        ASSERT(pHelper->pDataCols[1]->numOfPoints > 0);
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
                                 pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
          goto _err;
        if (round == 0) {
          tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
H
TD-100  
hzcheng 已提交
897 898 899
        } else {
          tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
        }
H
TD-100  
hzcheng 已提交
900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
        round++;
        blkIdx++;
        // TODO: the blkIdx here is not correct

        // if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) {
        //   if (pHelper->pDataCols[1]->numOfPoints > 0) {
        //     if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
        //                              pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
        //       goto _err;
        //     // TODO: the blkIdx here is not correct
        //     tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints);
        //   }
        // }

        // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints
        //                  ? INT64_MAX
        //                  : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
        // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];

        // if (key1 < key2) {
        //   for (int i = 0; i < pDataCols->numOfCols; i++) {
        //     SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
        //     memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
        //            ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
        //            TYPE_BYTES[pDataCol->type]);
        //   }
        //   pHelper->pDataCols[1]->numOfPoints++;
        //   iter1++;
        // } else if (key1 == key2) {
        //   // TODO: think about duplicate key cases
        //   ASSERT(false);
        // } else {
        //   for (int i = 0; i < pDataCols->numOfCols; i++) {
        //     SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
        //     memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
        //            ((char *)pDataCols->cols[i].pData +
        //             TYPE_BYTES[pDataCol->type] * iter2),
        //            TYPE_BYTES[pDataCol->type]);
        //   }
        //   pHelper->pDataCols[1]->numOfPoints++;
        //   iter2++;
        // }

        // if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
        //   if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err;
        //   // TODO: blkIdx here is not correct, fix it
        //   tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);

        //   tdResetDataCols(pHelper->pDataCols[1]);
        // }
H
TD-100  
hzcheng 已提交
950 951
      }
    }
H
hzcheng 已提交
952 953 954 955 956 957 958 959
  }

  return rowsWritten;

  _err:
  return -1;
}

H
TD-100  
hzcheng 已提交
960 961 962 963 964 965 966 967 968
static int compTSKEY(const void *key1, const void *key2) {
  if (*(TSKEY *)key1 > *(TSKEY *)key2) {
    return 1;
  } else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
    return 0;
  } else {
    return -1;
  }
}
H
hzcheng 已提交
969

H
TD-100  
hzcheng 已提交
970
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
H
TD-100  
hzcheng 已提交
971

H
TD-100  
hzcheng 已提交
972 973
  if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
    size_t tsize = esize + sizeof(SCompBlock) * 16;
H
TD-100  
hzcheng 已提交
974
    pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize);
H
TD-100  
hzcheng 已提交
975
    if (pHelper->pCompInfo == NULL) return -1;
H
TD-100  
hzcheng 已提交
976 977
  }

H
TD-100  
hzcheng 已提交
978 979 980 981 982 983
  return 0;
}

static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
984
  ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
985 986 987
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  // Adjust memory if no more room
H
TD-100  
hzcheng 已提交
988 989
  if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM);
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
H
TD-100  
hzcheng 已提交
990 991

  // Change the offset
H
hzcheng 已提交
992
  for (int i = 0; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
993 994 995 996 997
    SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
    if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
  }

  // Memmove if needed
H
TD-100  
hzcheng 已提交
998
  int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx);
H
TD-100  
hzcheng 已提交
999
  if (tsize > 0) {
H
TD-100  
hzcheng 已提交
1000 1001 1002 1003
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
    memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
            (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
H
TD-100  
hzcheng 已提交
1004 1005 1006
  }
  pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;

H
hzcheng 已提交
1007
  pIdx->numOfBlocks++;
H
TD-100  
hzcheng 已提交
1008
  pIdx->len += sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
1009
  ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
H
hzcheng 已提交
1010 1011
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1012

H
hzcheng 已提交
1013
  if (pIdx->numOfBlocks > 1) {
H
TD-100  
hzcheng 已提交
1014 1015 1016
    ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
  }

H
TD-100  
hzcheng 已提交
1017 1018
  return 0;

H
TD-100  
hzcheng 已提交
1019
_err:
H
TD-100  
hzcheng 已提交
1020 1021 1022
  return -1;
}

H
TD-100  
hzcheng 已提交
1023 1024 1025 1026
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
  ASSERT(pCompBlock->numOfSubBlocks == 0);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
1027
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
1028

H
TD-100  
hzcheng 已提交
1029 1030 1031
  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);

H
TD-100  
hzcheng 已提交
1032 1033
  size_t spaceNeeded =
      (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
1034 1035
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0)  goto _err;

H
TD-100  
hzcheng 已提交
1036 1037
  pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-100  
hzcheng 已提交
1038 1039 1040 1041 1042 1043 1044
  // Add the sub-block
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);

H
hzcheng 已提交
1045
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054
        SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
      }
    }


    *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;

    pSCompBlock->numOfSubBlocks++;
H
TD-100  
hzcheng 已提交
1055
    ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
H
TD-100  
hzcheng 已提交
1056
    pSCompBlock->len += sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
1057 1058 1059
    pSCompBlock->numOfPoints += rowsAdded;
    pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
    pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
H
TD-100  
hzcheng 已提交
1060 1061 1062
    pIdx->len += sizeof(SCompBlock);
  } else {  // Need to create two sub-blocks
    void *ptr = NULL;
H
hzcheng 已提交
1063
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1064 1065 1066 1067 1068 1069 1070
      SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
      if (pTCompBlock->numOfSubBlocks > 1) {
        ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
        break;
      }
    }

H
TD-100  
hzcheng 已提交
1071
    if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM));
H
TD-100  
hzcheng 已提交
1072 1073 1074 1075

    size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
    if (tsize > 0) {
      memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
H
hzcheng 已提交
1076
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
        SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
      }
    }

    ((SCompBlock *)ptr)[0] = *pSCompBlock;
    ((SCompBlock *)ptr)[0].numOfSubBlocks = 0;

    ((SCompBlock *)ptr)[1] = *pCompBlock;

    pSCompBlock->numOfSubBlocks = 2;
    pSCompBlock->numOfPoints += rowsAdded;
    pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
    pSCompBlock->len = sizeof(SCompBlock) * 2;
    pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
    pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast);

    pIdx->len += (sizeof(SCompBlock) * 2);
  }

H
hzcheng 已提交
1097 1098
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1099

H
TD-100  
hzcheng 已提交
1100
  return 0;
H
TD-100  
hzcheng 已提交
1101 1102 1103

_err:
  return -1;
H
TD-100  
hzcheng 已提交
1104 1105 1106
}

static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
H
TD-100  
hzcheng 已提交
1107 1108 1109 1110
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
1111
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124

  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

  ASSERT(pSCompBlock->numOfSubBlocks >= 1);

  // Delete the sub blocks it has
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
    }

H
hzcheng 已提交
1125
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1126 1127 1128 1129 1130 1131 1132 1133 1134
      SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
      if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
    }

    pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
  }

  *pSCompBlock = *pCompBlock;

H
hzcheng 已提交
1135 1136
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1137

H
TD-100  
hzcheng 已提交
1138
  return 0;
H
TD-100  
hzcheng 已提交
1139 1140 1141
}

// Get the number of rows in range [minKey, maxKey]
H
TD-100  
hzcheng 已提交
1142
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
H
TD-100  
hzcheng 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
  if (pDataCols->numOfPoints == 0) return 0;

  ASSERT(minKey <= maxKey);
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);
  TSKEY keyLast = dataColsKeyLast(pDataCols);
  ASSERT(keyFirst <= keyLast);

  if (minKey > keyLast || maxKey < keyFirst) return 0;

  void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
                           compTSKEY, TD_GE);
  ASSERT(ptr1 != NULL);

  void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
                           compTSKEY, TD_LE);
  ASSERT(ptr2 != NULL);

  if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;

H
TD-100  
hzcheng 已提交
1162
  return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
H
hzcheng 已提交
1163
}