tsdbRWHelper.c 51.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15 16

#include "os.h"
H
TD-100  
hzcheng 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tchecksum.h"
H
TD-185  
Hongze Cheng 已提交
19
#include "tcoding.h"
H
TD-353  
Hongze Cheng 已提交
20 21
#include "tscompression.h"
#include "tsdbMain.h"
H
hzcheng 已提交
22

H
TD-353  
Hongze Cheng 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
static bool  tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int   tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                  SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
static int   compareKeyBlock(const void *arg1, const void *arg2);
static int   tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int   compTSKEY(const void *key1, const void *key2);
static int   tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int   tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int   tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int   tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int   tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
static void  tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int   tsdbInitHelperFile(SRWHelper *pHelper);
static void  tsdbDestroyHelperFile(SRWHelper *pHelper);
static void  tsdbResetHelperTableImpl(SRWHelper *pHelper);
static void  tsdbResetHelperTable(SRWHelper *pHelper);
static void  tsdbInitHelperTable(SRWHelper *pHelper);
static void  tsdbDestroyHelperTable(SRWHelper *pHelper);
static void  tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void  tsdbResetHelperBlock(SRWHelper *pHelper);
static int   tsdbInitHelperBlock(SRWHelper *pHelper);
static int   tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int   comparColIdCompCol(const void *arg1, const void *arg2);
static int   comparColIdDataCol(const void *arg1, const void *arg2);
static int   tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf);
static int   tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                         SDataCols *pDataCols);
static int   tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
                                          int maxPoints, char *buffer, int bufferSize);
static int   tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);

H
TD-353  
Hongze Cheng 已提交
56
// ---------------------- INTERNAL FUNCTIONS ----------------------
H
TD-100  
hzcheng 已提交
57 58 59 60 61 62 63 64
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
}

int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER);
}

H
hzcheng 已提交
65
void tsdbDestroyHelper(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
66
  if (pHelper) {
H
Hongze Cheng 已提交
67
    tzfree(pHelper->pBuffer);
H
TD-166  
hzcheng 已提交
68
    tzfree(pHelper->compBuffer);
H
TD-100  
hzcheng 已提交
69 70 71 72 73
    tsdbDestroyHelperFile(pHelper);
    tsdbDestroyHelperTable(pHelper);
    tsdbDestroyHelperBlock(pHelper);
    memset((void *)pHelper, 0, sizeof(*pHelper));
  }
H
hzcheng 已提交
74 75
}

H
TD-100  
hzcheng 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89
void tsdbResetHelper(SRWHelper *pHelper) {
  if (pHelper) {
    // Reset the block part
    tsdbResetHelperBlockImpl(pHelper);

    // Reset the table part
    tsdbResetHelperTableImpl(pHelper);

    // Reset the file part
    tsdbCloseHelperFile(pHelper, false);
    tsdbResetHelperFileImpl(pHelper);

    pHelper->state = TSDB_HELPER_CLEAR_STATE;
  }
H
hzcheng 已提交
90 91
}

H
TD-100  
hzcheng 已提交
92 93
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
  ASSERT(pHelper != NULL && pGroup != NULL);
H
hzcheng 已提交
94

H
TD-100  
hzcheng 已提交
95
  // Clear the helper object
H
TD-100  
hzcheng 已提交
96
  tsdbResetHelper(pHelper);
H
TD-100  
hzcheng 已提交
97 98

  ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
H
hzcheng 已提交
99

H
TD-100  
hzcheng 已提交
100 101
  // Set the files
  pHelper->files.fid = pGroup->fileId;
H
hzcheng 已提交
102 103 104
  pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
  pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
  pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
H
TD-353  
Hongze Cheng 已提交
105
  if (helperType(pHelper) == TSDB_WRITE_HELPER) {
H
hzcheng 已提交
106
    char *fnameDup = strdup(pHelper->files.headF.fname);
H
TD-353  
Hongze Cheng 已提交
107 108 109 110
    if (fnameDup == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }
H
hzcheng 已提交
111 112 113 114 115 116 117
    char *dataDir = dirname(fnameDup);

    tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
    tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
    free((void *)fnameDup);
  }

H
TD-100  
hzcheng 已提交
118 119
  // Open the files
  if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
120
  if (helperType(pHelper) == TSDB_WRITE_HELPER) {
H
hzcheng 已提交
121 122
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
H
TD-100  
hzcheng 已提交
123 124

    // Create and open .h
H
TD-100  
hzcheng 已提交
125
    if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
H
TD-185  
Hongze Cheng 已提交
126
    // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
127 128 129 130
    if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
      tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
                TSDB_FILE_HEAD_SIZE, hpHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
      errno = TAOS_SYSTEM_ERROR(errno);
H
TD-185  
Hongze Cheng 已提交
131
      goto _err;
H
TD-353  
Hongze Cheng 已提交
132
    }
H
TD-100  
hzcheng 已提交
133 134

    // Create and open .l file if should
H
hzcheng 已提交
135 136
    if (tsdbShouldCreateNewLast(pHelper)) {
      if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
H
TD-185  
Hongze Cheng 已提交
137
      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
H
TD-353  
Hongze Cheng 已提交
138 139 140 141
        tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
                  TSDB_FILE_HEAD_SIZE, hpHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
H
hzcheng 已提交
142 143 144 145 146 147
    }
  } else {
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
  }

H
TD-100  
hzcheng 已提交
148
  helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
149

H
TD-100  
hzcheng 已提交
150 151
  return tsdbLoadCompIdx(pHelper, NULL);

H
TD-353  
Hongze Cheng 已提交
152
_err:
H
hzcheng 已提交
153 154 155 156 157
  return -1;
}

int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
  if (pHelper->files.headF.fd > 0) {
H
Hongze Cheng 已提交
158
    fsync(pHelper->files.headF.fd);
H
hzcheng 已提交
159 160 161 162
    close(pHelper->files.headF.fd);
    pHelper->files.headF.fd = -1;
  }
  if (pHelper->files.dataF.fd > 0) {
H
Hongze Cheng 已提交
163
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
H
Hongze Cheng 已提交
164
    fsync(pHelper->files.dataF.fd);
H
hzcheng 已提交
165 166 167 168
    close(pHelper->files.dataF.fd);
    pHelper->files.dataF.fd = -1;
  }
  if (pHelper->files.lastF.fd > 0) {
H
Hongze Cheng 已提交
169
    fsync(pHelper->files.lastF.fd);
H
hzcheng 已提交
170 171 172 173
    close(pHelper->files.lastF.fd);
    pHelper->files.lastF.fd = -1;
  }
  if (pHelper->files.nHeadF.fd > 0) {
H
Hongze Cheng 已提交
174
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
H
Hongze Cheng 已提交
175
    fsync(pHelper->files.nHeadF.fd);
H
hzcheng 已提交
176 177
    close(pHelper->files.nHeadF.fd);
    pHelper->files.nHeadF.fd = -1;
H
TD-100  
hzcheng 已提交
178 179 180 181 182 183
    if (hasError) {
      remove(pHelper->files.nHeadF.fname);
    } else {
      rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname);
      pHelper->files.headF.info = pHelper->files.nHeadF.info;
    }
H
hzcheng 已提交
184
  }
H
TD-353  
Hongze Cheng 已提交
185

H
hzcheng 已提交
186
  if (pHelper->files.nLastF.fd > 0) {
H
Hongze Cheng 已提交
187
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
H
Hongze Cheng 已提交
188
    fsync(pHelper->files.nLastF.fd);
H
hzcheng 已提交
189 190
    close(pHelper->files.nLastF.fd);
    pHelper->files.nLastF.fd = -1;
H
TD-100  
hzcheng 已提交
191 192 193 194 195 196
    if (hasError) {
      remove(pHelper->files.nLastF.fname);
    } else {
      rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname);
      pHelper->files.lastF.info = pHelper->files.nLastF.info;
    }
H
hzcheng 已提交
197 198 199 200
  }
  return 0;
}

H
TD-100  
hzcheng 已提交
201
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
H
TD-100  
hzcheng 已提交
202
  ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
203 204

  // Clear members and state used by previous table
H
TD-100  
hzcheng 已提交
205
  tsdbResetHelperTable(pHelper);
H
TD-100  
hzcheng 已提交
206
  ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)));
H
hzcheng 已提交
207

H
TD-100  
hzcheng 已提交
208 209
  pHelper->tableInfo.tid = pTable->tableId.tid;
  pHelper->tableInfo.uid = pTable->tableId.uid;
H
TD-353  
Hongze Cheng 已提交
210
  STSchema *pSchema = tsdbGetTableSchema(pTable);
H
Hongze Cheng 已提交
211
  pHelper->tableInfo.sversion = schemaVersion(pSchema);
H
TD-100  
hzcheng 已提交
212

H
TD-100  
hzcheng 已提交
213 214 215
  tdInitDataCols(pHelper->pDataCols[0], pSchema);
  tdInitDataCols(pHelper->pDataCols[1], pSchema);

H
TD-100  
hzcheng 已提交
216
  SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
H
TD-100  
hzcheng 已提交
217 218 219 220
  if (pIdx->offset > 0 && pIdx->hasLast) {
    pHelper->hasOldLastBlock = true;
  }

H
TD-100  
hzcheng 已提交
221
  helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
H
TD-100  
hzcheng 已提交
222
  ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
H
hzcheng 已提交
223 224
}

H
TD-100  
hzcheng 已提交
225 226
/**
 * Write part of of points from pDataCols to file
H
TD-353  
Hongze Cheng 已提交
227
 *
H
TD-100  
hzcheng 已提交
228 229 230
 * @return: number of points written to file successfully
 *          -1 for failure
 */
H
hzcheng 已提交
231 232
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
Haojun Liao 已提交
233
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
234

H
hzcheng 已提交
235 236 237 238
  SCompBlock compBlock;
  int        rowsToWrite = 0;
  TSKEY      keyFirst = dataColsKeyFirst(pDataCols);

H
TD-100  
hzcheng 已提交
239
  ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
240
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;  // for change purpose
H
hzcheng 已提交
241 242

  // Load the SCompInfo part if neccessary
H
TD-100  
hzcheng 已提交
243
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
H
TD-100  
hzcheng 已提交
244
  if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
H
hzcheng 已提交
245

H
TD-100  
hzcheng 已提交
246
  if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) {  // Just append as a super block
H
TD-100  
hzcheng 已提交
247
    ASSERT(pHelper->hasOldLastBlock == false);
H
Haojun Liao 已提交
248
    rowsToWrite = pDataCols->numOfRows;
H
hzcheng 已提交
249 250 251
    SFile *pWFile = NULL;
    bool   isLast = false;

H
TD-100  
hzcheng 已提交
252
    if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) {
H
hzcheng 已提交
253 254 255
      pWFile = &(pHelper->files.dataF);
    } else {
      isLast = true;
H
TD-100  
hzcheng 已提交
256
      pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
257 258
    }

H
TD-100  
hzcheng 已提交
259
    if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
H
hzcheng 已提交
260

H
hzcheng 已提交
261
    if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
H
TD-100  
hzcheng 已提交
262
  } else {  // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
H
TD-353  
Hongze Cheng 已提交
263 264
    SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks,
                                         sizeof(SCompBlock), compareKeyBlock, TD_GE);
H
TD-100  
hzcheng 已提交
265

H
hzcheng 已提交
266
    int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
H
TD-100  
hzcheng 已提交
267 268

    if (pCompBlock == NULL) {  // No key overlap, must has last block, just merge with the last block
H
TD-166  
hzcheng 已提交
269
      ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
H
TD-100  
hzcheng 已提交
270 271 272 273
      rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
      if (rowsToWrite < 0) goto _err;
    } else {  // Has key overlap

H
TD-100  
hzcheng 已提交
274 275
      if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
        // Key overlap with the block, must merge with the block
H
TD-100  
hzcheng 已提交
276 277 278

        rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
        if (rowsToWrite < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
279 280
      } else {  // Save as a super block in the middle
        rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1);
H
TD-100  
hzcheng 已提交
281
        ASSERT(rowsToWrite > 0);
H
TD-353  
Hongze Cheng 已提交
282 283
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0)
          goto _err;
H
TD-100  
hzcheng 已提交
284
        if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
285 286 287 288 289 290
      }
    }
  }

  return rowsToWrite;

H
TD-100  
hzcheng 已提交
291
_err:
H
hzcheng 已提交
292 293 294 295
  return -1;
}

int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
296
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-353  
Hongze Cheng 已提交
297
  SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-100  
hzcheng 已提交
298
  SCompBlock compBlock;
H
TD-100  
hzcheng 已提交
299
  if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
H
TD-100  
hzcheng 已提交
300 301
    if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;

H
hzcheng 已提交
302
    SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
H
TD-100  
hzcheng 已提交
303 304 305
    ASSERT(pCompBlock->last);

    if (pCompBlock->numOfSubBlocks > 1) {
H
hzcheng 已提交
306
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
H
Haojun Liao 已提交
307 308
      ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
             pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock);
H
TD-100  
hzcheng 已提交
309
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
H
Haojun Liao 已提交
310
                               pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
H
TD-100  
hzcheng 已提交
311 312
        return -1;

H
hzcheng 已提交
313
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
H
TD-100  
hzcheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326

    } else {
      if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
      pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
      if (pCompBlock->offset < 0) return -1;

      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
        return -1;
    }

    pHelper->hasOldLastBlock = false;
  }

H
hzcheng 已提交
327 328 329 330
  return 0;
}

int tsdbWriteCompInfo(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
331 332 333 334 335
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
    if (pIdx->offset > 0) {
      pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
      if (pIdx->offset < 0) return -1;
336
      ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
337 338 339 340

      if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
    }
  } else {
H
TD-100  
hzcheng 已提交
341 342
    pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
    pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
H
hzcheng 已提交
343
    pHelper->pCompInfo->checksum = 0;
H
TD-100  
hzcheng 已提交
344
    ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
H
TD-100  
hzcheng 已提交
345
    taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
H
TD-100  
hzcheng 已提交
346
    pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
H
hzcheng 已提交
347
    pIdx->uid = pHelper->tableInfo.uid;
H
TD-100  
hzcheng 已提交
348
    if (pIdx->offset < 0) return -1;
H
Hongze Cheng 已提交
349
    ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
350 351 352 353

    if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
  }

H
hzcheng 已提交
354 355 356 357
  return 0;
}

int tsdbWriteCompIdx(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
358
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-185  
Hongze Cheng 已提交
359 360 361 362 363 364
  off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
  if (offset < 0) return -1;

  SFile *pFile = &(pHelper->files.nHeadF);
  pFile->info.offset = offset;

H
Hongze Cheng 已提交
365 366
  // TODO: change the implementation of pHelper->pBuffer
  void *buf = pHelper->pBuffer;
H
TD-185  
Hongze Cheng 已提交
367 368 369
  for (uint32_t i = 0; i < pHelper->config.maxTables; i++) {
    SCompIdx *pCompIdx = pHelper->pCompIdx + i;
    if (pCompIdx->offset > 0) {
H
Hui Li 已提交
370 371
      int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
      if (tsizeof(pHelper->pBuffer) - drift < 128) {
H
TD-353  
Hongze Cheng 已提交
372
        pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
H
Hui Li 已提交
373 374
      }
      buf = POINTER_SHIFT(pHelper->pBuffer, drift);
H
Hongze Cheng 已提交
375
      buf = taosEncodeVariantU32(buf, i);
H
TD-185  
Hongze Cheng 已提交
376 377 378
      buf = tsdbEncodeSCompIdx(buf, pCompIdx);
    }
  }
H
TD-100  
hzcheng 已提交
379

H
Hongze Cheng 已提交
380 381
  int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
  taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
H
TD-100  
hzcheng 已提交
382

H
Hongze Cheng 已提交
383
  if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
H
TD-185  
Hongze Cheng 已提交
384
  pFile->info.len = tsize;
H
hzcheng 已提交
385 386 387 388
  return 0;
}

int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
389
  ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
390

H
TD-100  
hzcheng 已提交
391 392
  if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
    // If not load from file, just load it in object
H
TD-185  
Hongze Cheng 已提交
393
    SFile *pFile = &(pHelper->files.headF);
H
TD-353  
Hongze Cheng 已提交
394
    int    fd = pFile->fd;
H
TD-185  
Hongze Cheng 已提交
395 396 397 398 399

    memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
    if (pFile->info.offset > 0) {
      ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);

H
TD-353  
Hongze Cheng 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
      if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
        tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
      if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return -1;
      }
      if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
        tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
                  pFile->fname, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
Hongze Cheng 已提交
416
      if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
H
TD-353  
Hongze Cheng 已提交
417 418 419
        tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, pFile->info.len);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
TD-185  
Hongze Cheng 已提交
420 421
        return -1;
      }
H
hzcheng 已提交
422

H
TD-185  
Hongze Cheng 已提交
423
      // Decode it
H
Hongze Cheng 已提交
424
      void *ptr = pHelper->pBuffer;
H
TD-353  
Hongze Cheng 已提交
425
      while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
H
TD-185  
Hongze Cheng 已提交
426
        uint32_t tid = 0;
H
Hongze Cheng 已提交
427
        if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
H
TD-185  
Hongze Cheng 已提交
428 429 430 431
        ASSERT(tid > 0 && tid < pHelper->config.maxTables);

        if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;

H
TD-353  
Hongze Cheng 已提交
432
        ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM));
H
TD-185  
Hongze Cheng 已提交
433
      }
H
Hongze Cheng 已提交
434

H
TD-353  
Hongze Cheng 已提交
435 436 437 438
      if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
TD-100  
hzcheng 已提交
439
    }
H
TD-100  
hzcheng 已提交
440 441
  }
  helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
H
hzcheng 已提交
442

H
TD-100  
hzcheng 已提交
443
  // Copy the memory for outside usage
H
TD-100  
hzcheng 已提交
444
  if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
H
hzcheng 已提交
445 446 447 448 449

  return 0;
}

int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
450 451
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));

H
TD-100  
hzcheng 已提交
452 453
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
hzcheng 已提交
454 455
  int fd = pHelper->files.headF.fd;

H
TD-100  
hzcheng 已提交
456
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
H
TD-100  
hzcheng 已提交
457 458
    if (pIdx->offset > 0) {
      if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
H
hzcheng 已提交
459

H
TD-100  
hzcheng 已提交
460 461 462 463
      pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
      if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
      if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
    }
H
hzcheng 已提交
464

H
TD-100  
hzcheng 已提交
465 466
    helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
  }
H
hzcheng 已提交
467

H
TD-100  
hzcheng 已提交
468
  if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
H
hzcheng 已提交
469 470 471 472

  return 0;
}

H
TD-353  
Hongze Cheng 已提交
473
int tsdbloadcompdata(srwhelper *phelper, scompblock *pcompblock, void *target) {
H
TD-100  
hzcheng 已提交
474 475 476 477 478
  ASSERT(pCompBlock->numOfSubBlocks <= 1);
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;

  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;

H
TD-100  
hzcheng 已提交
479 480 481
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
  pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
  if (pHelper->pCompData == NULL) return -1;
H
TD-100  
hzcheng 已提交
482 483 484 485 486 487
  if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;

  ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);

  if (target) memcpy(target, pHelper->pCompData, tsize);

H
hzcheng 已提交
488 489 490
  return 0;
}

H
Hongze Cheng 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) {
  SCompData *pCompData = pHelper->pCompData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pCompData->numOfCols) {
      pStatis[i].numOfNull = -1;
      i++;
      continue;
    }

    if (pStatis[i].colId == pCompData->cols[j].colId) {
      pStatis[i].sum = pCompData->cols[j].sum;
      pStatis[i].max = pCompData->cols[j].max;
      pStatis[i].min = pCompData->cols[j].min;
      pStatis[i].maxIndex = pCompData->cols[j].maxIndex;
      pStatis[i].minIndex = pCompData->cols[j].minIndex;
      pStatis[i].numOfNull = pCompData->cols[j].numOfNull;
      i++;
      j++;
    } else if (pStatis[i].colId < pCompData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
      i++;
    } else {
      j++;
    }
  }
}

H
TD-100  
hzcheng 已提交
519 520 521
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
  SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-353  
Hongze Cheng 已提交
522
  ASSERT(pCompBlock->numOfSubBlocks >= 1);  // Must be super block
H
TD-100  
hzcheng 已提交
523

H
TD-353  
Hongze Cheng 已提交
524
  int         numOfSubBlocks = pCompBlock->numOfSubBlocks;
H
TD-100  
hzcheng 已提交
525 526
  SCompBlock *pStartBlock =
      (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
527

H
TD-100  
hzcheng 已提交
528 529 530 531
  if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
  for (int i = 1; i < numOfSubBlocks; i++) {
    pStartBlock++;
    if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
H
Haojun Liao 已提交
532
    tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
H
TD-100  
hzcheng 已提交
533
  }
H
TD-100  
hzcheng 已提交
534

H
hzcheng 已提交
535 536 537
  return 0;
}

H
TD-100  
hzcheng 已提交
538 539
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
H
TD-100  
hzcheng 已提交
540

H
TD-100  
hzcheng 已提交
541 542
  int numOfSubBlock = pCompBlock->numOfSubBlocks;
  if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
543

H
TD-100  
hzcheng 已提交
544
  tdResetDataCols(pHelper->pDataCols[0]);
H
TD-100  
hzcheng 已提交
545 546
  if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
  for (int i = 1; i < numOfSubBlock; i++) {
H
TD-100  
hzcheng 已提交
547
    tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
548 549
    pCompBlock++;
    if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
H
Haojun Liao 已提交
550
    if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
H
TD-100  
hzcheng 已提交
551
  }
H
TD-100  
hzcheng 已提交
552

H
TD-100  
hzcheng 已提交
553 554
  // if (target) TODO

H
hzcheng 已提交
555
  return 0;
H
TD-100  
hzcheng 已提交
556 557 558

_err:
  return -1;
H
hzcheng 已提交
559 560
}

H
TD-353  
Hongze Cheng 已提交
561 562 563 564 565 566 567 568 569
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
  char buf[TSDB_FILE_HEAD_SIZE] = "\0";

  void *pBuf = (void *)buf;
  pBuf = taosEncodeFixedU32(pBuf, version);
  pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));

  taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);

H
TD-353  
Hongze Cheng 已提交
570 571 572 573 574 575 576 577 578 579
  if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
    tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
    tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621

  return 0;
}

void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
  buf = taosEncodeFixedU32(buf, pInfo->offset);
  buf = taosEncodeFixedU32(buf, pInfo->len);
  buf = taosEncodeFixedU64(buf, pInfo->size);
  buf = taosEncodeFixedU64(buf, pInfo->tombSize);
  buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
  buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);

  return buf;
}

void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
  buf = taosDecodeFixedU32(buf, &(pInfo->offset));
  buf = taosDecodeFixedU32(buf, &(pInfo->len));
  buf = taosDecodeFixedU64(buf, &(pInfo->size));
  buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
  buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
  buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));

  return buf;
}

// ---------------------- INTERNAL FUNCTIONS ----------------------
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
  ASSERT(pHelper->files.lastF.fd > 0);
  struct stat st;
  fstat(pHelper->files.lastF.fd, &st);
  if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
  return false;
}

static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
  ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);

  SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
  int64_t    offset = 0;
H
hzcheng 已提交
622

H
TD-100  
hzcheng 已提交
623
  offset = lseek(pFile->fd, 0, SEEK_END);
H
TD-353  
Hongze Cheng 已提交
624 625 626 627 628
  if (offset < 0) {
    tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
hzcheng 已提交
629 630 631 632 633 634

  int nColsNotAllNull = 0;
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
    SDataCol *pDataCol = pDataCols->cols + ncol;
    SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;

H
TD-353  
Hongze Cheng 已提交
635
    if (isNEleNull(pDataCol, rowsToWrite)) {  // all data to commit are NULL, just ignore it
H
hzcheng 已提交
636 637 638
      continue;
    }

H
Hongze Cheng 已提交
639 640
    memset(pCompCol, 0, sizeof(*pCompCol));

H
hzcheng 已提交
641 642
    pCompCol->colId = pDataCol->colId;
    pCompCol->type = pDataCol->type;
H
Hongze Cheng 已提交
643
    if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) {
H
TD-321  
Hongze Cheng 已提交
644 645 646 647
      (*tDataTypeDesc[pDataCol->type].getStatisFunc)(
          (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
          &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
    }
H
hzcheng 已提交
648 649 650
    nColsNotAllNull++;
  }

H
TD-100  
hzcheng 已提交
651
  ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
H
TD-100  
hzcheng 已提交
652

H
TD-166  
hzcheng 已提交
653 654 655 656 657
  // Compress the data if neccessary
  int     tcol = 0;
  int32_t toffset = 0;
  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
  int32_t lsize = tsize;
H
TD-100  
hzcheng 已提交
658
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
H
TD-166  
hzcheng 已提交
659
    if (tcol >= nColsNotAllNull) break;
H
TD-100  
hzcheng 已提交
660 661

    SDataCol *pDataCol = pDataCols->cols + ncol;
H
TD-166  
hzcheng 已提交
662 663 664 665 666 667 668
    SCompCol *pCompCol = pCompData->cols + tcol;

    if (pDataCol->colId != pCompCol->colId) continue;
    void *tptr = (void *)((char *)pCompData + lsize);

    pCompCol->offset = toffset;

H
TD-166  
hzcheng 已提交
669
    int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
H
TD-166  
hzcheng 已提交
670 671

    if (pHelper->config.compress) {
H
TD-166  
hzcheng 已提交
672 673
      if (pHelper->config.compress == TWO_STAGE_COMP) {
        pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
H
TD-353  
Hongze Cheng 已提交
674 675 676 677
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-166  
hzcheng 已提交
678 679
      }

H
TD-166  
hzcheng 已提交
680
      pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
H
TD-353  
Hongze Cheng 已提交
681 682
          (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pHelper->config.compress,
          pHelper->compBuffer, tsizeof(pHelper->compBuffer));
H
TD-166  
hzcheng 已提交
683 684
    } else {
      pCompCol->len = tlen;
H
TD-166  
hzcheng 已提交
685
      memcpy(tptr, pDataCol->pData, pCompCol->len);
H
TD-100  
hzcheng 已提交
686
    }
H
TD-166  
hzcheng 已提交
687 688 689 690 691 692 693 694

    // Add checksum
    pCompCol->len += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);

    toffset += pCompCol->len;
    lsize += pCompCol->len;
    tcol++;
H
hzcheng 已提交
695 696
  }

H
TD-166  
hzcheng 已提交
697 698 699 700 701 702 703
  pCompData->delimiter = TSDB_FILE_DELIMITER;
  pCompData->uid = pHelper->tableInfo.uid;
  pCompData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);

  // Write the whole block to file
H
TD-353  
Hongze Cheng 已提交
704 705 706 707 708
  if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
    tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-166  
hzcheng 已提交
709 710

  // Update pCompBlock membership vairables
H
hzcheng 已提交
711 712
  pCompBlock->last = isLast;
  pCompBlock->offset = offset;
H
TD-100  
hzcheng 已提交
713
  pCompBlock->algorithm = pHelper->config.compress;
H
Haojun Liao 已提交
714
  pCompBlock->numOfRows = rowsToWrite;
H
hzcheng 已提交
715
  pCompBlock->sversion = pHelper->tableInfo.sversion;
H
TD-166  
hzcheng 已提交
716
  pCompBlock->len = (int32_t)lsize;
H
TD-100  
hzcheng 已提交
717
  pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
H
hzcheng 已提交
718
  pCompBlock->numOfCols = nColsNotAllNull;
H
TD-100  
hzcheng 已提交
719 720
  pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
  pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
H
hzcheng 已提交
721

H
TD-353  
Hongze Cheng 已提交
722 723 724 725 726
  tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
            " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
            REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, pCompBlock->offset,
            pCompBlock->numOfRows, pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, pCompBlock->keyLast);

H
hzcheng 已提交
727 728
  return 0;

H
TD-353  
Hongze Cheng 已提交
729
_err:
H
hzcheng 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
  return -1;
}

static int compareKeyBlock(const void *arg1, const void *arg2) {
  TSKEY       key = *(TSKEY *)arg1;
  SCompBlock *pBlock = (SCompBlock *)arg2;

  if (key < pBlock->keyFirst) {
    return -1;
  } else if (key > pBlock->keyLast) {
    return 1;
  }

  return 0;
}

static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
H
TD-100  
hzcheng 已提交
747
  // TODO: set pHelper->hasOldBlock
H
hzcheng 已提交
748 749 750
  int        rowsWritten = 0;
  SCompBlock compBlock = {0};

H
Haojun Liao 已提交
751
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
752 753
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);

H
hzcheng 已提交
754
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
755
  ASSERT(blkIdx < pIdx->numOfBlocks);
H
hzcheng 已提交
756

H
TD-100  
hzcheng 已提交
757 758 759
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
  ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
H
TD-100  
hzcheng 已提交
760
  // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
H
hzcheng 已提交
761

H
TD-353  
Hongze Cheng 已提交
762 763 764
  if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) {  // Merge with the last block by append
    ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock &&
           blkIdx == pIdx->numOfBlocks - 1);
H
TD-100  
hzcheng 已提交
765 766
    int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5;  // TODO: make a interface

H
Haojun Liao 已提交
767
    rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
H
TD-100  
hzcheng 已提交
768
    if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
H
Hongze Cheng 已提交
769 770
        (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
        (pHelper->files.nLastF.fd) < 0) {
H
TD-100  
hzcheng 已提交
771
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
H
TD-100  
hzcheng 已提交
772
        goto _err;
H
TD-100  
hzcheng 已提交
773
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
hzcheng 已提交
774
    } else {
H
TD-100  
hzcheng 已提交
775
      // Load
H
TD-100  
hzcheng 已提交
776
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
Hongze Cheng 已提交
777
      ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
H
TD-100  
hzcheng 已提交
778 779 780 781
      // Merge
      if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
      // Write
      SFile *pWFile = NULL;
H
TD-353  
Hongze Cheng 已提交
782
      bool   isLast = false;
H
Haojun Liao 已提交
783
      if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) {
H
TD-100  
hzcheng 已提交
784
        pWFile = &(pHelper->files.dataF);
H
hzcheng 已提交
785
      } else {
H
TD-100  
hzcheng 已提交
786 787
        isLast = true;
        pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
788
      }
H
TD-353  
Hongze Cheng 已提交
789 790
      if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock,
                               isLast, true) < 0)
H
TD-100  
hzcheng 已提交
791 792
        goto _err;
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
793
    }
H
TD-100  
hzcheng 已提交
794 795 796

    ASSERT(pHelper->hasOldLastBlock);
    pHelper->hasOldLastBlock = false;
H
TD-100  
hzcheng 已提交
797
  } else {
H
TD-100  
hzcheng 已提交
798 799
    // Key must overlap with the block
    ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
H
TD-100  
hzcheng 已提交
800

H
Hongze Cheng 已提交
801
    TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
H
TD-100  
hzcheng 已提交
802

H
TD-100  
hzcheng 已提交
803
    // rows1: number of rows must merge in this block
H
TD-353  
Hongze Cheng 已提交
804 805
    int rows1 =
        tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
H
Hongze Cheng 已提交
806
    // rows2: max number of rows the block can have more
H
Haojun Liao 已提交
807
    int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
H
TD-100  
hzcheng 已提交
808 809
    // rows3: number of rows between this block and the next block
    int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
H
hzcheng 已提交
810

H
TD-100  
hzcheng 已提交
811
    ASSERT(rows3 >= rows1);
H
TD-100  
hzcheng 已提交
812

H
Hongze Cheng 已提交
813 814 815 816
    if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
        ((!blockAtIdx(pHelper, blkIdx)->last) ||
         ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
          (pHelper->files.nLastF.fd < 0)))) {
H
TD-100  
hzcheng 已提交
817 818 819 820
      rowsWritten = rows1;
      bool   isLast = false;
      SFile *pFile = NULL;

H
TD-100  
hzcheng 已提交
821
      if (blockAtIdx(pHelper, blkIdx)->last) {
H
TD-100  
hzcheng 已提交
822 823 824 825 826 827 828 829
        isLast = true;
        pFile = &(pHelper->files.lastF);
      } else {
        pFile = &(pHelper->files.dataF);
      }

      if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
Hongze Cheng 已提交
830
    } else {  // Load-Merge-Write
H
TD-100  
hzcheng 已提交
831
      // Load
H
TD-100  
hzcheng 已提交
832
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
TD-100  
hzcheng 已提交
833 834
      if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;

H
TD-100  
hzcheng 已提交
835 836
      rowsWritten = rows3;

H
TD-353  
Hongze Cheng 已提交
837 838
      int iter1 = 0;  // iter over pHelper->pDataCols[0]
      int iter2 = 0;  // iter over pDataCols
H
TD-100  
hzcheng 已提交
839 840
      int round = 0;
      // tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
841
      while (true) {
H
Haojun Liao 已提交
842
        if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
H
TD-521  
Hongze Cheng 已提交
843 844
        tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
                           pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
H
Haojun Liao 已提交
845
        ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
H
TD-100  
hzcheng 已提交
846
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
H
Haojun Liao 已提交
847
                                 pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
H
TD-100  
hzcheng 已提交
848 849 850
          goto _err;
        if (round == 0) {
          tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
H
TD-100  
hzcheng 已提交
851 852 853
        } else {
          tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
        }
H
TD-100  
hzcheng 已提交
854 855
        round++;
        blkIdx++;
H
TD-100  
hzcheng 已提交
856 857
      }
    }
H
hzcheng 已提交
858 859 860 861
  }

  return rowsWritten;

H
TD-353  
Hongze Cheng 已提交
862
_err:
H
hzcheng 已提交
863 864 865
  return -1;
}

H
TD-100  
hzcheng 已提交
866 867 868 869 870 871 872 873 874
static int compTSKEY(const void *key1, const void *key2) {
  if (*(TSKEY *)key1 > *(TSKEY *)key2) {
    return 1;
  } else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
    return 0;
  } else {
    return -1;
  }
}
H
hzcheng 已提交
875

H
TD-100  
hzcheng 已提交
876 877 878
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
  if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
    size_t tsize = esize + sizeof(SCompBlock) * 16;
H
TD-100  
hzcheng 已提交
879
    pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize);
H
TD-100  
hzcheng 已提交
880
    if (pHelper->pCompInfo == NULL) return -1;
H
TD-100  
hzcheng 已提交
881 882
  }

H
TD-100  
hzcheng 已提交
883 884 885 886 887 888
  return 0;
}

static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
889
  ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
890 891 892
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  // Adjust memory if no more room
H
TD-100  
hzcheng 已提交
893 894
  if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM);
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
H
TD-100  
hzcheng 已提交
895 896

  // Change the offset
H
hzcheng 已提交
897
  for (int i = 0; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
898 899 900 901 902
    SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
    if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
  }

  // Memmove if needed
H
TD-100  
hzcheng 已提交
903
  int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx);
H
TD-100  
hzcheng 已提交
904
  if (tsize > 0) {
H
TD-100  
hzcheng 已提交
905 906 907 908
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
    memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
            (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
H
TD-100  
hzcheng 已提交
909 910 911
  }
  pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;

H
hzcheng 已提交
912
  pIdx->numOfBlocks++;
H
TD-100  
hzcheng 已提交
913
  pIdx->len += sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
914
  ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
H
hzcheng 已提交
915 916
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
917

H
hzcheng 已提交
918
  if (pIdx->numOfBlocks > 1) {
H
TD-100  
hzcheng 已提交
919 920 921
    ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
  }

H
TD-353  
Hongze Cheng 已提交
922 923 924
  tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
925 926
  return 0;

H
TD-100  
hzcheng 已提交
927
_err:
H
TD-100  
hzcheng 已提交
928 929 930
  return -1;
}

H
TD-100  
hzcheng 已提交
931 932 933 934
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
  ASSERT(pCompBlock->numOfSubBlocks == 0);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
935
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
936

H
TD-100  
hzcheng 已提交
937 938 939
  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);

H
TD-100  
hzcheng 已提交
940 941
  size_t spaceNeeded =
      (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock);
H
TD-353  
Hongze Cheng 已提交
942
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
H
TD-100  
hzcheng 已提交
943

H
TD-100  
hzcheng 已提交
944 945
  pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-100  
hzcheng 已提交
946 947 948 949 950 951 952
  // Add the sub-block
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);

H
hzcheng 已提交
953
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
954 955 956 957 958 959 960 961
        SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
      }
    }

    *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;

    pSCompBlock->numOfSubBlocks++;
H
TD-100  
hzcheng 已提交
962
    ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
H
TD-100  
hzcheng 已提交
963
    pSCompBlock->len += sizeof(SCompBlock);
H
Haojun Liao 已提交
964
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
965 966
    pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
    pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
H
TD-100  
hzcheng 已提交
967 968 969
    pIdx->len += sizeof(SCompBlock);
  } else {  // Need to create two sub-blocks
    void *ptr = NULL;
H
hzcheng 已提交
970
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
971 972
      SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
      if (pTCompBlock->numOfSubBlocks > 1) {
H
Hongze Cheng 已提交
973
        ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
H
TD-100  
hzcheng 已提交
974 975 976 977
        break;
      }
    }

H
TD-353  
Hongze Cheng 已提交
978
    if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM));
H
TD-100  
hzcheng 已提交
979 980 981

    size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
    if (tsize > 0) {
H
Hongze Cheng 已提交
982
      memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
H
hzcheng 已提交
983
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
984 985 986 987 988 989 990 991 992 993 994
        SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
      }
    }

    ((SCompBlock *)ptr)[0] = *pSCompBlock;
    ((SCompBlock *)ptr)[0].numOfSubBlocks = 0;

    ((SCompBlock *)ptr)[1] = *pCompBlock;

    pSCompBlock->numOfSubBlocks = 2;
H
Haojun Liao 已提交
995
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
996 997 998 999 1000 1001 1002 1003
    pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
    pSCompBlock->len = sizeof(SCompBlock) * 2;
    pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
    pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast);

    pIdx->len += (sizeof(SCompBlock) * 2);
  }

H
hzcheng 已提交
1004 1005
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1006

H
TD-353  
Hongze Cheng 已提交
1007 1008
  tsdbTrace("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);

H
TD-100  
hzcheng 已提交
1009
  return 0;
H
TD-100  
hzcheng 已提交
1010 1011 1012

_err:
  return -1;
H
TD-100  
hzcheng 已提交
1013 1014 1015
}

static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
H
TD-100  
hzcheng 已提交
1016 1017 1018 1019
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
1020
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033

  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

  ASSERT(pSCompBlock->numOfSubBlocks >= 1);

  // Delete the sub blocks it has
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
    }

H
hzcheng 已提交
1034
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1035 1036 1037 1038 1039 1040 1041 1042 1043
      SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
      if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
    }

    pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
  }

  *pSCompBlock = *pCompBlock;

H
hzcheng 已提交
1044 1045
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1046

H
TD-353  
Hongze Cheng 已提交
1047 1048 1049
  tsdbTrace("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
1050
  return 0;
H
TD-100  
hzcheng 已提交
1051 1052 1053
}

// Get the number of rows in range [minKey, maxKey]
H
TD-100  
hzcheng 已提交
1054
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
H
Haojun Liao 已提交
1055
  if (pDataCols->numOfRows == 0) return 0;
H
TD-100  
hzcheng 已提交
1056 1057 1058 1059 1060 1061 1062 1063

  ASSERT(minKey <= maxKey);
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);
  TSKEY keyLast = dataColsKeyLast(pDataCols);
  ASSERT(keyFirst <= keyLast);

  if (minKey > keyLast || maxKey < keyFirst) return 0;

H
Haojun Liao 已提交
1064
  void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1065 1066 1067
                           compTSKEY, TD_GE);
  ASSERT(ptr1 != NULL);

H
Haojun Liao 已提交
1068
  void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1069 1070 1071 1072 1073
                           compTSKEY, TD_LE);
  ASSERT(ptr2 != NULL);

  if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;

H
TD-100  
hzcheng 已提交
1074
  return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
H
TD-185  
Hongze Cheng 已提交
1075 1076
}

H
TD-353  
Hongze Cheng 已提交
1077 1078 1079
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
  pHelper->files.fid = -1;
H
TD-353  
Hongze Cheng 已提交
1080
  tfree(pHelper->files.headF.fname);
H
TD-353  
Hongze Cheng 已提交
1081
  pHelper->files.headF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1082
  tfree(pHelper->files.dataF.fname);
H
TD-353  
Hongze Cheng 已提交
1083
  pHelper->files.dataF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1084
  tfree(pHelper->files.lastF.fname);
H
TD-353  
Hongze Cheng 已提交
1085
  pHelper->files.lastF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1086
  tfree(pHelper->files.nHeadF.fname);
H
TD-353  
Hongze Cheng 已提交
1087
  pHelper->files.nHeadF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1088
  tfree(pHelper->files.nLastF.fname);
H
TD-353  
Hongze Cheng 已提交
1089 1090
  pHelper->files.nLastF.fd = -1;
}
H
TD-185  
Hongze Cheng 已提交
1091

H
TD-353  
Hongze Cheng 已提交
1092
static int tsdbInitHelperFile(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1093 1094
  STsdbCfg *pCfg = &pHelper->pRepo->config;
  size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1095
  pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
H
TD-353  
Hongze Cheng 已提交
1096 1097 1098 1099
  if (pHelper->pCompIdx == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1100 1101 1102

  tsdbResetHelperFileImpl(pHelper);
  return 0;
H
TD-185  
Hongze Cheng 已提交
1103 1104
}

H
TD-353  
Hongze Cheng 已提交
1105 1106
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
  tsdbCloseHelperFile(pHelper, false);
H
TD-353  
Hongze Cheng 已提交
1107
  tsdbResetHelperFileImpl(pHelper);
H
TD-353  
Hongze Cheng 已提交
1108 1109
  tzfree(pHelper->pCompIdx);
}
H
TD-185  
Hongze Cheng 已提交
1110

H
TD-353  
Hongze Cheng 已提交
1111 1112 1113 1114 1115
// ---------- Operations on Helper Table part
static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable));
  pHelper->hasOldLastBlock = false;
}
H
TD-185  
Hongze Cheng 已提交
1116

H
TD-353  
Hongze Cheng 已提交
1117 1118 1119 1120
static void tsdbResetHelperTable(SRWHelper *pHelper) {
  tsdbResetHelperBlock(pHelper);
  tsdbResetHelperTableImpl(pHelper);
  helperClearState(pHelper, (TSDB_HELPER_TABLE_SET | TSDB_HELPER_INFO_LOAD));
H
TD-185  
Hongze Cheng 已提交
1121 1122
}

H
TD-353  
Hongze Cheng 已提交
1123
static void tsdbInitHelperTable(SRWHelper *pHelper) { tsdbResetHelperTableImpl(pHelper); }
H
TD-185  
Hongze Cheng 已提交
1124

H
TD-353  
Hongze Cheng 已提交
1125
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); }
H
TD-185  
Hongze Cheng 已提交
1126

H
TD-353  
Hongze Cheng 已提交
1127 1128 1129 1130 1131
// ---------- Operations on Helper Block part
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
  tdResetDataCols(pHelper->pDataCols[0]);
  tdResetDataCols(pHelper->pDataCols[1]);
}
H
TD-185  
Hongze Cheng 已提交
1132

H
TD-353  
Hongze Cheng 已提交
1133 1134 1135 1136 1137 1138
static void tsdbResetHelperBlock(SRWHelper *pHelper) {
  tsdbResetHelperBlockImpl(pHelper);
  // helperClearState(pHelper, TSDB_HELPER_)
}

static int tsdbInitHelperBlock(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
  STsdbRepo *pRepo = helperRepo(pHelper);

  pHelper->pDataCols[0] =
      tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
  pHelper->pDataCols[1] =
      tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
  if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161

  tsdbResetHelperBlockImpl(pHelper);

  return 0;
}

static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
  tzfree(pHelper->pCompData);
  tdFreeDataCols(pHelper->pDataCols[0]);
  tdFreeDataCols(pHelper->pDataCols[1]);
}

static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
H
TD-353  
Hongze Cheng 已提交
1162
  STsdbCfg *pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
1163 1164
  memset((void *)pHelper, 0, sizeof(*pHelper));

H
TD-353  
Hongze Cheng 已提交
1165 1166 1167
  helperType(pHelper) = type;
  helperRepo(pHelper) = pRepo;
  helperState(pHelper) = TSDB_HELPER_CLEAR_STATE;
H
TD-353  
Hongze Cheng 已提交
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178

  // Init file part
  if (tsdbInitHelperFile(pHelper) < 0) goto _err;

  // Init table part
  tsdbInitHelperTable(pHelper);

  // Init block part
  if (tsdbInitHelperBlock(pHelper) < 0) goto _err;

  pHelper->pBuffer =
H
TD-353  
Hongze Cheng 已提交
1179 1180 1181 1182 1183 1184
      tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pRepo->imem->maxCols +
              pRepo->imem->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
  if (pHelper->pBuffer == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
TD-185  
Hongze Cheng 已提交
1185 1186

  return 0;
H
TD-353  
Hongze Cheng 已提交
1187 1188 1189 1190

_err:
  tsdbDestroyHelper(pHelper);
  return -1;
H
TD-185  
Hongze Cheng 已提交
1191 1192
}

H
TD-353  
Hongze Cheng 已提交
1193 1194 1195
static int comparColIdCompCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1196

H
TD-353  
Hongze Cheng 已提交
1197 1198 1199
static int comparColIdDataCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1200

H
TD-353  
Hongze Cheng 已提交
1201 1202 1203 1204
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
  if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
  if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
H
TD-185  
Hongze Cheng 已提交
1205

H
TD-353  
Hongze Cheng 已提交
1206
  return 0;
H
TD-185  
Hongze Cheng 已提交
1207 1208
}

H
TD-353  
Hongze Cheng 已提交
1209 1210 1211 1212
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                       SDataCols *pDataCols) {
  if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
H
TD-185  
Hongze Cheng 已提交
1213

H
TD-353  
Hongze Cheng 已提交
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
  void *ptr = NULL;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t colId = colIds[i];

    ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol),
                  comparColIdCompCol);
    if (ptr == NULL) continue;
    SCompCol *pCompCol = (SCompCol *)ptr;

    ptr =
        bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
    ASSERT(ptr != NULL);
    SDataCol *pDataCol = (SDataCol *)ptr;

    pDataCol->len = pCompCol->len;
    if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
  }

  return 0;
}

static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
                                        int maxPoints, char *buffer, int bufferSize) {
  // Verify by checksum
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;

  // Decode the data
  if (comp) {
    // // Need to decompress
    pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
        content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  } else {
    // No need to decompress, just memcpy it
    pDataCol->len = len - sizeof(TSCKSUM);
    memcpy(pDataCol->pData, content, pDataCol->len);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  }
  return 0;
}

static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);

  ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);

  SCompData *pCompData = (SCompData *)pHelper->pBuffer;

H
TD-353  
Hongze Cheng 已提交
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
  SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);

  int fd = pFile->fd;
  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
  if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
    tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1281 1282 1283
  ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);

  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1284 1285 1286 1287 1288 1289
  if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
    tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
              pFile->fname, pCompBlock->offset, pCompBlock->len);
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313

  pDataCols->numOfRows = pCompBlock->numOfRows;

  // Recover the data
  int ccol = 0;
  int dcol = 0;
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (ccol >= pCompData->numOfCols) {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
      continue;
    }

    SCompCol *pCompCol = &(pCompData->cols[ccol]);

    if (pCompCol->colId == pDataCol->colId) {
      if (pCompBlock->algorithm == TWO_STAGE_COMP) {
        int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
        if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
          zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
        }
        pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
H
TD-353  
Hongze Cheng 已提交
1314 1315 1316 1317
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-353  
Hongze Cheng 已提交
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338
      }
      if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
                                       pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
                                       pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
        goto _err;
      dcol++;
      ccol++;
    } else if (pCompCol->colId < pDataCol->colId) {
      ccol++;
    } else {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
    }
  }

  return 0;

_err:
  return -1;
}
H
TD-353  
Hongze Cheng 已提交
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368

static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
  buf = taosEncodeVariantU32(buf, pIdx->len);
  buf = taosEncodeVariantU32(buf, pIdx->offset);
  buf = taosEncodeFixedU8(buf, pIdx->hasLast);
  buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  buf = taosEncodeFixedU64(buf, pIdx->uid);
  buf = taosEncodeFixedU64(buf, pIdx->maxKey);

  return buf;
}

static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->maxKey = (TSKEY)value;

  return buf;
}