tsdbRWHelper.c 48.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15 16

#include "os.h"
H
TD-100  
hzcheng 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tchecksum.h"
H
TD-185  
Hongze Cheng 已提交
19
#include "tcoding.h"
H
TD-353  
Hongze Cheng 已提交
20 21
#include "tscompression.h"
#include "tsdbMain.h"
H
hzcheng 已提交
22

H
TD-353  
Hongze Cheng 已提交
23
// ---------------------- INTERNAL FUNCTIONS ----------------------
H
TD-100  
hzcheng 已提交
24 25 26 27 28 29 30 31
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
}

int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER);
}

H
hzcheng 已提交
32
void tsdbDestroyHelper(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
33
  if (pHelper) {
H
Hongze Cheng 已提交
34
    tzfree(pHelper->pBuffer);
H
TD-166  
hzcheng 已提交
35
    tzfree(pHelper->compBuffer);
H
TD-100  
hzcheng 已提交
36 37 38 39 40
    tsdbDestroyHelperFile(pHelper);
    tsdbDestroyHelperTable(pHelper);
    tsdbDestroyHelperBlock(pHelper);
    memset((void *)pHelper, 0, sizeof(*pHelper));
  }
H
hzcheng 已提交
41 42
}

H
TD-100  
hzcheng 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
void tsdbResetHelper(SRWHelper *pHelper) {
  if (pHelper) {
    // Reset the block part
    tsdbResetHelperBlockImpl(pHelper);

    // Reset the table part
    tsdbResetHelperTableImpl(pHelper);

    // Reset the file part
    tsdbCloseHelperFile(pHelper, false);
    tsdbResetHelperFileImpl(pHelper);

    pHelper->state = TSDB_HELPER_CLEAR_STATE;
  }
H
hzcheng 已提交
57 58
}

H
TD-100  
hzcheng 已提交
59 60
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
  ASSERT(pHelper != NULL && pGroup != NULL);
H
hzcheng 已提交
61

H
TD-100  
hzcheng 已提交
62
  // Clear the helper object
H
TD-100  
hzcheng 已提交
63
  tsdbResetHelper(pHelper);
H
TD-100  
hzcheng 已提交
64 65

  ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
H
hzcheng 已提交
66

H
TD-100  
hzcheng 已提交
67 68
  // Set the files
  pHelper->files.fid = pGroup->fileId;
H
hzcheng 已提交
69 70 71 72 73
  pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
  pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
  pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
  if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
    char *fnameDup = strdup(pHelper->files.headF.fname);
H
TD-353  
Hongze Cheng 已提交
74 75 76 77
    if (fnameDup == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }
H
hzcheng 已提交
78 79 80 81 82 83 84
    char *dataDir = dirname(fnameDup);

    tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
    tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
    free((void *)fnameDup);
  }

H
TD-100  
hzcheng 已提交
85 86
  // Open the files
  if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
87
  if (helperType(pHelper) == TSDB_WRITE_HELPER) {
H
hzcheng 已提交
88 89
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
H
TD-100  
hzcheng 已提交
90 91

    // Create and open .h
H
TD-100  
hzcheng 已提交
92
    if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
H
TD-185  
Hongze Cheng 已提交
93
    // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
94 95 96 97
    if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
      tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
                TSDB_FILE_HEAD_SIZE, hpHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
      errno = TAOS_SYSTEM_ERROR(errno);
H
TD-185  
Hongze Cheng 已提交
98
      goto _err;
H
TD-353  
Hongze Cheng 已提交
99
    }
H
TD-100  
hzcheng 已提交
100 101

    // Create and open .l file if should
H
hzcheng 已提交
102 103
    if (tsdbShouldCreateNewLast(pHelper)) {
      if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
H
TD-185  
Hongze Cheng 已提交
104
      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
H
TD-353  
Hongze Cheng 已提交
105 106 107 108
        tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
                  TSDB_FILE_HEAD_SIZE, hpHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
H
hzcheng 已提交
109 110 111 112 113 114
    }
  } else {
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
  }

H
TD-100  
hzcheng 已提交
115
  helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
116

H
TD-100  
hzcheng 已提交
117 118
  return tsdbLoadCompIdx(pHelper, NULL);

H
TD-353  
Hongze Cheng 已提交
119
_err:
H
hzcheng 已提交
120 121 122 123 124
  return -1;
}

int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
  if (pHelper->files.headF.fd > 0) {
H
Hongze Cheng 已提交
125
    fsync(pHelper->files.headF.fd);
H
hzcheng 已提交
126 127 128 129
    close(pHelper->files.headF.fd);
    pHelper->files.headF.fd = -1;
  }
  if (pHelper->files.dataF.fd > 0) {
H
Hongze Cheng 已提交
130
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
H
Hongze Cheng 已提交
131
    fsync(pHelper->files.dataF.fd);
H
hzcheng 已提交
132 133 134 135
    close(pHelper->files.dataF.fd);
    pHelper->files.dataF.fd = -1;
  }
  if (pHelper->files.lastF.fd > 0) {
H
Hongze Cheng 已提交
136
    fsync(pHelper->files.lastF.fd);
H
hzcheng 已提交
137 138 139 140
    close(pHelper->files.lastF.fd);
    pHelper->files.lastF.fd = -1;
  }
  if (pHelper->files.nHeadF.fd > 0) {
H
Hongze Cheng 已提交
141
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
H
Hongze Cheng 已提交
142
    fsync(pHelper->files.nHeadF.fd);
H
hzcheng 已提交
143 144
    close(pHelper->files.nHeadF.fd);
    pHelper->files.nHeadF.fd = -1;
H
TD-100  
hzcheng 已提交
145 146 147 148 149 150
    if (hasError) {
      remove(pHelper->files.nHeadF.fname);
    } else {
      rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname);
      pHelper->files.headF.info = pHelper->files.nHeadF.info;
    }
H
hzcheng 已提交
151
  }
H
TD-353  
Hongze Cheng 已提交
152

H
hzcheng 已提交
153
  if (pHelper->files.nLastF.fd > 0) {
H
Hongze Cheng 已提交
154
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
H
Hongze Cheng 已提交
155
    fsync(pHelper->files.nLastF.fd);
H
hzcheng 已提交
156 157
    close(pHelper->files.nLastF.fd);
    pHelper->files.nLastF.fd = -1;
H
TD-100  
hzcheng 已提交
158 159 160 161 162 163
    if (hasError) {
      remove(pHelper->files.nLastF.fname);
    } else {
      rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname);
      pHelper->files.lastF.info = pHelper->files.nLastF.info;
    }
H
hzcheng 已提交
164 165 166 167
  }
  return 0;
}

H
TD-100  
hzcheng 已提交
168
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
H
TD-100  
hzcheng 已提交
169
  ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
170 171

  // Clear members and state used by previous table
H
TD-100  
hzcheng 已提交
172
  tsdbResetHelperTable(pHelper);
H
TD-100  
hzcheng 已提交
173
  ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)));
H
hzcheng 已提交
174

H
TD-100  
hzcheng 已提交
175 176
  pHelper->tableInfo.tid = pTable->tableId.tid;
  pHelper->tableInfo.uid = pTable->tableId.uid;
H
TD-353  
Hongze Cheng 已提交
177
  STSchema *pSchema = tsdbGetTableSchema(pTable);
H
Hongze Cheng 已提交
178
  pHelper->tableInfo.sversion = schemaVersion(pSchema);
H
TD-100  
hzcheng 已提交
179

H
TD-100  
hzcheng 已提交
180 181 182
  tdInitDataCols(pHelper->pDataCols[0], pSchema);
  tdInitDataCols(pHelper->pDataCols[1], pSchema);

H
TD-100  
hzcheng 已提交
183
  SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
H
TD-100  
hzcheng 已提交
184 185 186 187
  if (pIdx->offset > 0 && pIdx->hasLast) {
    pHelper->hasOldLastBlock = true;
  }

H
TD-100  
hzcheng 已提交
188
  helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
H
TD-100  
hzcheng 已提交
189
  ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
H
hzcheng 已提交
190 191
}

H
TD-100  
hzcheng 已提交
192 193
/**
 * Write part of of points from pDataCols to file
H
TD-353  
Hongze Cheng 已提交
194
 *
H
TD-100  
hzcheng 已提交
195 196 197
 * @return: number of points written to file successfully
 *          -1 for failure
 */
H
hzcheng 已提交
198 199
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
Haojun Liao 已提交
200
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
201

H
hzcheng 已提交
202 203 204 205
  SCompBlock compBlock;
  int        rowsToWrite = 0;
  TSKEY      keyFirst = dataColsKeyFirst(pDataCols);

H
TD-100  
hzcheng 已提交
206
  ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
207
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;  // for change purpose
H
hzcheng 已提交
208 209

  // Load the SCompInfo part if neccessary
H
TD-100  
hzcheng 已提交
210
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
H
TD-100  
hzcheng 已提交
211
  if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
H
hzcheng 已提交
212

H
TD-100  
hzcheng 已提交
213
  if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) {  // Just append as a super block
H
TD-100  
hzcheng 已提交
214
    ASSERT(pHelper->hasOldLastBlock == false);
H
Haojun Liao 已提交
215
    rowsToWrite = pDataCols->numOfRows;
H
hzcheng 已提交
216 217 218
    SFile *pWFile = NULL;
    bool   isLast = false;

H
TD-100  
hzcheng 已提交
219
    if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) {
H
hzcheng 已提交
220 221 222
      pWFile = &(pHelper->files.dataF);
    } else {
      isLast = true;
H
TD-100  
hzcheng 已提交
223
      pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
224 225
    }

H
TD-100  
hzcheng 已提交
226
    if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
H
hzcheng 已提交
227

H
hzcheng 已提交
228
    if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
H
TD-100  
hzcheng 已提交
229
  } else {  // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
H
TD-353  
Hongze Cheng 已提交
230 231
    SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks,
                                         sizeof(SCompBlock), compareKeyBlock, TD_GE);
H
TD-100  
hzcheng 已提交
232

H
hzcheng 已提交
233
    int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
H
TD-100  
hzcheng 已提交
234 235

    if (pCompBlock == NULL) {  // No key overlap, must has last block, just merge with the last block
H
TD-166  
hzcheng 已提交
236
      ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
H
TD-100  
hzcheng 已提交
237 238 239 240
      rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
      if (rowsToWrite < 0) goto _err;
    } else {  // Has key overlap

H
TD-100  
hzcheng 已提交
241 242
      if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
        // Key overlap with the block, must merge with the block
H
TD-100  
hzcheng 已提交
243 244 245

        rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
        if (rowsToWrite < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
246 247
      } else {  // Save as a super block in the middle
        rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1);
H
TD-100  
hzcheng 已提交
248
        ASSERT(rowsToWrite > 0);
H
TD-353  
Hongze Cheng 已提交
249 250
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0)
          goto _err;
H
TD-100  
hzcheng 已提交
251
        if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
252 253 254 255 256 257
      }
    }
  }

  return rowsToWrite;

H
TD-100  
hzcheng 已提交
258
_err:
H
hzcheng 已提交
259 260 261 262
  return -1;
}

int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
263
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-353  
Hongze Cheng 已提交
264
  SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-100  
hzcheng 已提交
265
  SCompBlock compBlock;
H
TD-100  
hzcheng 已提交
266
  if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
H
TD-100  
hzcheng 已提交
267 268
    if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;

H
hzcheng 已提交
269
    SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
H
TD-100  
hzcheng 已提交
270 271 272
    ASSERT(pCompBlock->last);

    if (pCompBlock->numOfSubBlocks > 1) {
H
hzcheng 已提交
273
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
H
Haojun Liao 已提交
274 275
      ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
             pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock);
H
TD-100  
hzcheng 已提交
276
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
H
Haojun Liao 已提交
277
                               pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
H
TD-100  
hzcheng 已提交
278 279
        return -1;

H
hzcheng 已提交
280
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
H
TD-100  
hzcheng 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293

    } else {
      if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
      pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
      if (pCompBlock->offset < 0) return -1;

      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
        return -1;
    }

    pHelper->hasOldLastBlock = false;
  }

H
hzcheng 已提交
294 295 296 297
  return 0;
}

int tsdbWriteCompInfo(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
298 299 300 301 302
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
    if (pIdx->offset > 0) {
      pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
      if (pIdx->offset < 0) return -1;
303
      ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
304 305 306 307

      if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
    }
  } else {
H
TD-100  
hzcheng 已提交
308 309
    pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
    pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
H
hzcheng 已提交
310
    pHelper->pCompInfo->checksum = 0;
H
TD-100  
hzcheng 已提交
311
    ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
H
TD-100  
hzcheng 已提交
312
    taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
H
TD-100  
hzcheng 已提交
313
    pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
H
hzcheng 已提交
314
    pIdx->uid = pHelper->tableInfo.uid;
H
TD-100  
hzcheng 已提交
315
    if (pIdx->offset < 0) return -1;
H
Hongze Cheng 已提交
316
    ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
317 318 319 320

    if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
  }

H
hzcheng 已提交
321 322 323 324
  return 0;
}

int tsdbWriteCompIdx(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
325
  ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
H
TD-185  
Hongze Cheng 已提交
326 327 328 329 330 331
  off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
  if (offset < 0) return -1;

  SFile *pFile = &(pHelper->files.nHeadF);
  pFile->info.offset = offset;

H
Hongze Cheng 已提交
332 333
  // TODO: change the implementation of pHelper->pBuffer
  void *buf = pHelper->pBuffer;
H
TD-185  
Hongze Cheng 已提交
334 335 336
  for (uint32_t i = 0; i < pHelper->config.maxTables; i++) {
    SCompIdx *pCompIdx = pHelper->pCompIdx + i;
    if (pCompIdx->offset > 0) {
H
Hui Li 已提交
337 338
      int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
      if (tsizeof(pHelper->pBuffer) - drift < 128) {
H
TD-353  
Hongze Cheng 已提交
339
        pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
H
Hui Li 已提交
340 341
      }
      buf = POINTER_SHIFT(pHelper->pBuffer, drift);
H
Hongze Cheng 已提交
342
      buf = taosEncodeVariantU32(buf, i);
H
TD-185  
Hongze Cheng 已提交
343 344 345
      buf = tsdbEncodeSCompIdx(buf, pCompIdx);
    }
  }
H
TD-100  
hzcheng 已提交
346

H
Hongze Cheng 已提交
347 348
  int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
  taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
H
TD-100  
hzcheng 已提交
349

H
Hongze Cheng 已提交
350
  if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
H
TD-185  
Hongze Cheng 已提交
351
  pFile->info.len = tsize;
H
hzcheng 已提交
352 353 354 355
  return 0;
}

int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
356
  ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
357

H
TD-100  
hzcheng 已提交
358 359
  if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
    // If not load from file, just load it in object
H
TD-185  
Hongze Cheng 已提交
360
    SFile *pFile = &(pHelper->files.headF);
H
TD-353  
Hongze Cheng 已提交
361
    int    fd = pFile->fd;
H
TD-185  
Hongze Cheng 已提交
362 363 364 365 366

    memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
    if (pFile->info.offset > 0) {
      ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);

H
TD-353  
Hongze Cheng 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
      if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
        tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
      if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return -1;
      }
      if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
        tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
                  pFile->fname, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
Hongze Cheng 已提交
383
      if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
H
TD-353  
Hongze Cheng 已提交
384 385 386
        tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, pFile->info.len);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
TD-185  
Hongze Cheng 已提交
387 388
        return -1;
      }
H
hzcheng 已提交
389

H
TD-185  
Hongze Cheng 已提交
390
      // Decode it
H
Hongze Cheng 已提交
391
      void *ptr = pHelper->pBuffer;
H
TD-353  
Hongze Cheng 已提交
392
      while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
H
TD-185  
Hongze Cheng 已提交
393
        uint32_t tid = 0;
H
Hongze Cheng 已提交
394
        if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
H
TD-185  
Hongze Cheng 已提交
395 396 397 398
        ASSERT(tid > 0 && tid < pHelper->config.maxTables);

        if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;

H
TD-353  
Hongze Cheng 已提交
399
        ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM));
H
TD-185  
Hongze Cheng 已提交
400
      }
H
Hongze Cheng 已提交
401

H
TD-353  
Hongze Cheng 已提交
402 403 404 405
      if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
TD-100  
hzcheng 已提交
406
    }
H
TD-100  
hzcheng 已提交
407 408
  }
  helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
H
hzcheng 已提交
409

H
TD-100  
hzcheng 已提交
410
  // Copy the memory for outside usage
H
TD-100  
hzcheng 已提交
411
  if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
H
hzcheng 已提交
412 413 414 415 416

  return 0;
}

int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
417 418
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));

H
TD-100  
hzcheng 已提交
419 420
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
hzcheng 已提交
421 422
  int fd = pHelper->files.headF.fd;

H
TD-100  
hzcheng 已提交
423
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
H
TD-100  
hzcheng 已提交
424 425
    if (pIdx->offset > 0) {
      if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
H
hzcheng 已提交
426

H
TD-100  
hzcheng 已提交
427 428 429 430
      pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
      if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
      if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
    }
H
hzcheng 已提交
431

H
TD-100  
hzcheng 已提交
432 433
    helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
  }
H
hzcheng 已提交
434

H
TD-100  
hzcheng 已提交
435
  if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
H
hzcheng 已提交
436 437 438 439

  return 0;
}

H
TD-100  
hzcheng 已提交
440 441 442 443 444 445
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;

  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;

H
TD-100  
hzcheng 已提交
446 447 448
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
  pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
  if (pHelper->pCompData == NULL) return -1;
H
TD-100  
hzcheng 已提交
449 450 451 452 453 454
  if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;

  ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);

  if (target) memcpy(target, pHelper->pCompData, tsize);

H
hzcheng 已提交
455 456 457
  return 0;
}

H
Hongze Cheng 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) {
  SCompData *pCompData = pHelper->pCompData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pCompData->numOfCols) {
      pStatis[i].numOfNull = -1;
      i++;
      continue;
    }

    if (pStatis[i].colId == pCompData->cols[j].colId) {
      pStatis[i].sum = pCompData->cols[j].sum;
      pStatis[i].max = pCompData->cols[j].max;
      pStatis[i].min = pCompData->cols[j].min;
      pStatis[i].maxIndex = pCompData->cols[j].maxIndex;
      pStatis[i].minIndex = pCompData->cols[j].minIndex;
      pStatis[i].numOfNull = pCompData->cols[j].numOfNull;
      i++;
      j++;
    } else if (pStatis[i].colId < pCompData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
      i++;
    } else {
      j++;
    }
  }
}

H
TD-100  
hzcheng 已提交
486 487 488
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
  SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-353  
Hongze Cheng 已提交
489
  ASSERT(pCompBlock->numOfSubBlocks >= 1);  // Must be super block
H
TD-100  
hzcheng 已提交
490

H
TD-353  
Hongze Cheng 已提交
491
  int         numOfSubBlocks = pCompBlock->numOfSubBlocks;
H
TD-100  
hzcheng 已提交
492 493
  SCompBlock *pStartBlock =
      (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
494

H
TD-100  
hzcheng 已提交
495 496 497 498
  if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
  for (int i = 1; i < numOfSubBlocks; i++) {
    pStartBlock++;
    if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
H
Haojun Liao 已提交
499
    tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
H
TD-100  
hzcheng 已提交
500
  }
H
TD-100  
hzcheng 已提交
501

H
hzcheng 已提交
502 503 504
  return 0;
}

H
TD-100  
hzcheng 已提交
505 506
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
H
TD-100  
hzcheng 已提交
507

H
TD-100  
hzcheng 已提交
508 509
  int numOfSubBlock = pCompBlock->numOfSubBlocks;
  if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
510

H
TD-100  
hzcheng 已提交
511
  tdResetDataCols(pHelper->pDataCols[0]);
H
TD-100  
hzcheng 已提交
512 513
  if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
  for (int i = 1; i < numOfSubBlock; i++) {
H
TD-100  
hzcheng 已提交
514
    tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
515 516
    pCompBlock++;
    if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
H
Haojun Liao 已提交
517
    if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
H
TD-100  
hzcheng 已提交
518
  }
H
TD-100  
hzcheng 已提交
519

H
TD-100  
hzcheng 已提交
520 521
  // if (target) TODO

H
hzcheng 已提交
522
  return 0;
H
TD-100  
hzcheng 已提交
523 524 525

_err:
  return -1;
H
hzcheng 已提交
526 527
}

H
TD-353  
Hongze Cheng 已提交
528 529 530 531 532 533 534 535 536
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
  char buf[TSDB_FILE_HEAD_SIZE] = "\0";

  void *pBuf = (void *)buf;
  pBuf = taosEncodeFixedU32(pBuf, version);
  pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));

  taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);

H
TD-353  
Hongze Cheng 已提交
537 538 539 540 541 542 543 544 545 546
  if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
    tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
    tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588

  return 0;
}

void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
  buf = taosEncodeFixedU32(buf, pInfo->offset);
  buf = taosEncodeFixedU32(buf, pInfo->len);
  buf = taosEncodeFixedU64(buf, pInfo->size);
  buf = taosEncodeFixedU64(buf, pInfo->tombSize);
  buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
  buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);

  return buf;
}

void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
  buf = taosDecodeFixedU32(buf, &(pInfo->offset));
  buf = taosDecodeFixedU32(buf, &(pInfo->len));
  buf = taosDecodeFixedU64(buf, &(pInfo->size));
  buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
  buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
  buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));

  return buf;
}

// ---------------------- INTERNAL FUNCTIONS ----------------------
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
  ASSERT(pHelper->files.lastF.fd > 0);
  struct stat st;
  fstat(pHelper->files.lastF.fd, &st);
  if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
  return false;
}

static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
  ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);

  SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
  int64_t    offset = 0;
H
hzcheng 已提交
589

H
TD-100  
hzcheng 已提交
590
  offset = lseek(pFile->fd, 0, SEEK_END);
H
TD-353  
Hongze Cheng 已提交
591 592 593 594 595
  if (offset < 0) {
    tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
hzcheng 已提交
596 597 598 599 600 601

  int nColsNotAllNull = 0;
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
    SDataCol *pDataCol = pDataCols->cols + ncol;
    SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;

H
TD-353  
Hongze Cheng 已提交
602
    if (isNEleNull(pDataCol, rowsToWrite)) {  // all data to commit are NULL, just ignore it
H
hzcheng 已提交
603 604 605
      continue;
    }

H
Hongze Cheng 已提交
606 607
    memset(pCompCol, 0, sizeof(*pCompCol));

H
hzcheng 已提交
608 609
    pCompCol->colId = pDataCol->colId;
    pCompCol->type = pDataCol->type;
H
Hongze Cheng 已提交
610
    if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) {
H
TD-321  
Hongze Cheng 已提交
611 612 613 614
      (*tDataTypeDesc[pDataCol->type].getStatisFunc)(
          (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
          &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
    }
H
hzcheng 已提交
615 616 617
    nColsNotAllNull++;
  }

H
TD-100  
hzcheng 已提交
618
  ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
H
TD-100  
hzcheng 已提交
619

H
TD-166  
hzcheng 已提交
620 621 622 623 624
  // Compress the data if neccessary
  int     tcol = 0;
  int32_t toffset = 0;
  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
  int32_t lsize = tsize;
H
TD-100  
hzcheng 已提交
625
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
H
TD-166  
hzcheng 已提交
626
    if (tcol >= nColsNotAllNull) break;
H
TD-100  
hzcheng 已提交
627 628

    SDataCol *pDataCol = pDataCols->cols + ncol;
H
TD-166  
hzcheng 已提交
629 630 631 632 633 634 635
    SCompCol *pCompCol = pCompData->cols + tcol;

    if (pDataCol->colId != pCompCol->colId) continue;
    void *tptr = (void *)((char *)pCompData + lsize);

    pCompCol->offset = toffset;

H
TD-166  
hzcheng 已提交
636
    int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
H
TD-166  
hzcheng 已提交
637 638

    if (pHelper->config.compress) {
H
TD-166  
hzcheng 已提交
639 640
      if (pHelper->config.compress == TWO_STAGE_COMP) {
        pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
H
TD-353  
Hongze Cheng 已提交
641 642 643 644
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-166  
hzcheng 已提交
645 646
      }

H
TD-166  
hzcheng 已提交
647
      pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
H
TD-353  
Hongze Cheng 已提交
648 649
          (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pHelper->config.compress,
          pHelper->compBuffer, tsizeof(pHelper->compBuffer));
H
TD-166  
hzcheng 已提交
650 651
    } else {
      pCompCol->len = tlen;
H
TD-166  
hzcheng 已提交
652
      memcpy(tptr, pDataCol->pData, pCompCol->len);
H
TD-100  
hzcheng 已提交
653
    }
H
TD-166  
hzcheng 已提交
654 655 656 657 658 659 660 661

    // Add checksum
    pCompCol->len += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);

    toffset += pCompCol->len;
    lsize += pCompCol->len;
    tcol++;
H
hzcheng 已提交
662 663
  }

H
TD-166  
hzcheng 已提交
664 665 666 667 668 669 670
  pCompData->delimiter = TSDB_FILE_DELIMITER;
  pCompData->uid = pHelper->tableInfo.uid;
  pCompData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);

  // Write the whole block to file
H
TD-353  
Hongze Cheng 已提交
671 672 673 674 675
  if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
    tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-166  
hzcheng 已提交
676 677

  // Update pCompBlock membership vairables
H
hzcheng 已提交
678 679
  pCompBlock->last = isLast;
  pCompBlock->offset = offset;
H
TD-100  
hzcheng 已提交
680
  pCompBlock->algorithm = pHelper->config.compress;
H
Haojun Liao 已提交
681
  pCompBlock->numOfRows = rowsToWrite;
H
hzcheng 已提交
682
  pCompBlock->sversion = pHelper->tableInfo.sversion;
H
TD-166  
hzcheng 已提交
683
  pCompBlock->len = (int32_t)lsize;
H
TD-100  
hzcheng 已提交
684
  pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
H
hzcheng 已提交
685
  pCompBlock->numOfCols = nColsNotAllNull;
H
TD-100  
hzcheng 已提交
686 687
  pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
  pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
H
hzcheng 已提交
688

H
TD-353  
Hongze Cheng 已提交
689 690 691 692 693
  tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
            " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
            REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, pCompBlock->offset,
            pCompBlock->numOfRows, pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, pCompBlock->keyLast);

H
hzcheng 已提交
694 695
  return 0;

H
TD-353  
Hongze Cheng 已提交
696
_err:
H
hzcheng 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
  return -1;
}

static int compareKeyBlock(const void *arg1, const void *arg2) {
  TSKEY       key = *(TSKEY *)arg1;
  SCompBlock *pBlock = (SCompBlock *)arg2;

  if (key < pBlock->keyFirst) {
    return -1;
  } else if (key > pBlock->keyLast) {
    return 1;
  }

  return 0;
}

static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
H
TD-100  
hzcheng 已提交
714
  // TODO: set pHelper->hasOldBlock
H
hzcheng 已提交
715 716 717
  int        rowsWritten = 0;
  SCompBlock compBlock = {0};

H
Haojun Liao 已提交
718
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
719 720
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);

H
hzcheng 已提交
721
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
722
  ASSERT(blkIdx < pIdx->numOfBlocks);
H
hzcheng 已提交
723

H
TD-100  
hzcheng 已提交
724 725 726
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
  ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
H
TD-100  
hzcheng 已提交
727
  // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
H
hzcheng 已提交
728

H
TD-353  
Hongze Cheng 已提交
729 730 731
  if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) {  // Merge with the last block by append
    ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock &&
           blkIdx == pIdx->numOfBlocks - 1);
H
TD-100  
hzcheng 已提交
732 733
    int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5;  // TODO: make a interface

H
Haojun Liao 已提交
734
    rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
H
TD-100  
hzcheng 已提交
735
    if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
H
Hongze Cheng 已提交
736 737
        (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
        (pHelper->files.nLastF.fd) < 0) {
H
TD-100  
hzcheng 已提交
738
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
H
TD-100  
hzcheng 已提交
739
        goto _err;
H
TD-100  
hzcheng 已提交
740
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
hzcheng 已提交
741
    } else {
H
TD-100  
hzcheng 已提交
742
      // Load
H
TD-100  
hzcheng 已提交
743
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
Hongze Cheng 已提交
744
      ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
H
TD-100  
hzcheng 已提交
745 746 747 748
      // Merge
      if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
      // Write
      SFile *pWFile = NULL;
H
TD-353  
Hongze Cheng 已提交
749
      bool   isLast = false;
H
Haojun Liao 已提交
750
      if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) {
H
TD-100  
hzcheng 已提交
751
        pWFile = &(pHelper->files.dataF);
H
hzcheng 已提交
752
      } else {
H
TD-100  
hzcheng 已提交
753 754
        isLast = true;
        pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
755
      }
H
TD-353  
Hongze Cheng 已提交
756 757
      if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock,
                               isLast, true) < 0)
H
TD-100  
hzcheng 已提交
758 759
        goto _err;
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
760
    }
H
TD-100  
hzcheng 已提交
761 762 763

    ASSERT(pHelper->hasOldLastBlock);
    pHelper->hasOldLastBlock = false;
H
TD-100  
hzcheng 已提交
764
  } else {
H
TD-100  
hzcheng 已提交
765 766
    // Key must overlap with the block
    ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
H
TD-100  
hzcheng 已提交
767

H
Hongze Cheng 已提交
768
    TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
H
TD-100  
hzcheng 已提交
769

H
TD-100  
hzcheng 已提交
770
    // rows1: number of rows must merge in this block
H
TD-353  
Hongze Cheng 已提交
771 772
    int rows1 =
        tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
H
Hongze Cheng 已提交
773
    // rows2: max number of rows the block can have more
H
Haojun Liao 已提交
774
    int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
H
TD-100  
hzcheng 已提交
775 776
    // rows3: number of rows between this block and the next block
    int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
H
hzcheng 已提交
777

H
TD-100  
hzcheng 已提交
778
    ASSERT(rows3 >= rows1);
H
TD-100  
hzcheng 已提交
779

H
Hongze Cheng 已提交
780 781 782 783
    if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
        ((!blockAtIdx(pHelper, blkIdx)->last) ||
         ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
          (pHelper->files.nLastF.fd < 0)))) {
H
TD-100  
hzcheng 已提交
784 785 786 787
      rowsWritten = rows1;
      bool   isLast = false;
      SFile *pFile = NULL;

H
TD-100  
hzcheng 已提交
788
      if (blockAtIdx(pHelper, blkIdx)->last) {
H
TD-100  
hzcheng 已提交
789 790 791 792 793 794 795 796
        isLast = true;
        pFile = &(pHelper->files.lastF);
      } else {
        pFile = &(pHelper->files.dataF);
      }

      if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
Hongze Cheng 已提交
797
    } else {  // Load-Merge-Write
H
TD-100  
hzcheng 已提交
798
      // Load
H
TD-100  
hzcheng 已提交
799
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
TD-100  
hzcheng 已提交
800 801
      if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;

H
TD-100  
hzcheng 已提交
802 803
      rowsWritten = rows3;

H
TD-353  
Hongze Cheng 已提交
804 805
      int iter1 = 0;  // iter over pHelper->pDataCols[0]
      int iter2 = 0;  // iter over pDataCols
H
TD-100  
hzcheng 已提交
806 807
      int round = 0;
      // tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
808
      while (true) {
H
Haojun Liao 已提交
809
        if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
H
TD-521  
Hongze Cheng 已提交
810 811
        tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
                           pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
H
Haojun Liao 已提交
812
        ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
H
TD-100  
hzcheng 已提交
813
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
H
Haojun Liao 已提交
814
                                 pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
H
TD-100  
hzcheng 已提交
815 816 817
          goto _err;
        if (round == 0) {
          tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
H
TD-100  
hzcheng 已提交
818 819 820
        } else {
          tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
        }
H
TD-100  
hzcheng 已提交
821 822
        round++;
        blkIdx++;
H
TD-100  
hzcheng 已提交
823 824
      }
    }
H
hzcheng 已提交
825 826 827 828
  }

  return rowsWritten;

H
TD-353  
Hongze Cheng 已提交
829
_err:
H
hzcheng 已提交
830 831 832
  return -1;
}

H
TD-100  
hzcheng 已提交
833 834 835 836 837 838 839 840 841
static int compTSKEY(const void *key1, const void *key2) {
  if (*(TSKEY *)key1 > *(TSKEY *)key2) {
    return 1;
  } else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
    return 0;
  } else {
    return -1;
  }
}
H
hzcheng 已提交
842

H
TD-100  
hzcheng 已提交
843 844 845
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
  if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
    size_t tsize = esize + sizeof(SCompBlock) * 16;
H
TD-100  
hzcheng 已提交
846
    pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize);
H
TD-100  
hzcheng 已提交
847
    if (pHelper->pCompInfo == NULL) return -1;
H
TD-100  
hzcheng 已提交
848 849
  }

H
TD-100  
hzcheng 已提交
850 851 852 853 854 855
  return 0;
}

static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
856
  ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
857 858 859
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  // Adjust memory if no more room
H
TD-100  
hzcheng 已提交
860 861
  if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM);
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
H
TD-100  
hzcheng 已提交
862 863

  // Change the offset
H
hzcheng 已提交
864
  for (int i = 0; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
865 866 867 868 869
    SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
    if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
  }

  // Memmove if needed
H
TD-100  
hzcheng 已提交
870
  int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx);
H
TD-100  
hzcheng 已提交
871
  if (tsize > 0) {
H
TD-100  
hzcheng 已提交
872 873 874 875
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
    memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
            (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
H
TD-100  
hzcheng 已提交
876 877 878
  }
  pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;

H
hzcheng 已提交
879
  pIdx->numOfBlocks++;
H
TD-100  
hzcheng 已提交
880
  pIdx->len += sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
881
  ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
H
hzcheng 已提交
882 883
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
884

H
hzcheng 已提交
885
  if (pIdx->numOfBlocks > 1) {
H
TD-100  
hzcheng 已提交
886 887 888
    ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
  }

H
TD-353  
Hongze Cheng 已提交
889 890 891
  tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
892 893
  return 0;

H
TD-100  
hzcheng 已提交
894
_err:
H
TD-100  
hzcheng 已提交
895 896 897
  return -1;
}

H
TD-100  
hzcheng 已提交
898 899 900 901
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
  ASSERT(pCompBlock->numOfSubBlocks == 0);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
902
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
903

H
TD-100  
hzcheng 已提交
904 905 906
  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);

H
TD-100  
hzcheng 已提交
907 908
  size_t spaceNeeded =
      (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock);
H
TD-353  
Hongze Cheng 已提交
909
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
H
TD-100  
hzcheng 已提交
910

H
TD-100  
hzcheng 已提交
911 912
  pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-100  
hzcheng 已提交
913 914 915 916 917 918 919
  // Add the sub-block
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);

H
hzcheng 已提交
920
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
921 922 923 924 925 926 927 928
        SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
      }
    }

    *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;

    pSCompBlock->numOfSubBlocks++;
H
TD-100  
hzcheng 已提交
929
    ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
H
TD-100  
hzcheng 已提交
930
    pSCompBlock->len += sizeof(SCompBlock);
H
Haojun Liao 已提交
931
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
932 933
    pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
    pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
H
TD-100  
hzcheng 已提交
934 935 936
    pIdx->len += sizeof(SCompBlock);
  } else {  // Need to create two sub-blocks
    void *ptr = NULL;
H
hzcheng 已提交
937
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
938 939
      SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
      if (pTCompBlock->numOfSubBlocks > 1) {
H
Hongze Cheng 已提交
940
        ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
H
TD-100  
hzcheng 已提交
941 942 943 944
        break;
      }
    }

H
TD-353  
Hongze Cheng 已提交
945
    if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM));
H
TD-100  
hzcheng 已提交
946 947 948

    size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
    if (tsize > 0) {
H
Hongze Cheng 已提交
949
      memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
H
hzcheng 已提交
950
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
951 952 953 954 955 956 957 958 959 960 961
        SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
      }
    }

    ((SCompBlock *)ptr)[0] = *pSCompBlock;
    ((SCompBlock *)ptr)[0].numOfSubBlocks = 0;

    ((SCompBlock *)ptr)[1] = *pCompBlock;

    pSCompBlock->numOfSubBlocks = 2;
H
Haojun Liao 已提交
962
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
963 964 965 966 967 968 969 970
    pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
    pSCompBlock->len = sizeof(SCompBlock) * 2;
    pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
    pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast);

    pIdx->len += (sizeof(SCompBlock) * 2);
  }

H
hzcheng 已提交
971 972
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
973

H
TD-353  
Hongze Cheng 已提交
974 975
  tsdbTrace("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);

H
TD-100  
hzcheng 已提交
976
  return 0;
H
TD-100  
hzcheng 已提交
977 978 979

_err:
  return -1;
H
TD-100  
hzcheng 已提交
980 981 982
}

static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
H
TD-100  
hzcheng 已提交
983 984 985 986
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
987
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000

  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

  ASSERT(pSCompBlock->numOfSubBlocks >= 1);

  // Delete the sub blocks it has
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
    }

H
hzcheng 已提交
1001
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010
      SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
      if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
    }

    pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
  }

  *pSCompBlock = *pCompBlock;

H
hzcheng 已提交
1011 1012
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1013

H
TD-353  
Hongze Cheng 已提交
1014 1015 1016
  tsdbTrace("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
1017
  return 0;
H
TD-100  
hzcheng 已提交
1018 1019 1020
}

// Get the number of rows in range [minKey, maxKey]
H
TD-100  
hzcheng 已提交
1021
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
H
Haojun Liao 已提交
1022
  if (pDataCols->numOfRows == 0) return 0;
H
TD-100  
hzcheng 已提交
1023 1024 1025 1026 1027 1028 1029 1030

  ASSERT(minKey <= maxKey);
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);
  TSKEY keyLast = dataColsKeyLast(pDataCols);
  ASSERT(keyFirst <= keyLast);

  if (minKey > keyLast || maxKey < keyFirst) return 0;

H
Haojun Liao 已提交
1031
  void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1032 1033 1034
                           compTSKEY, TD_GE);
  ASSERT(ptr1 != NULL);

H
Haojun Liao 已提交
1035
  void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1036 1037 1038 1039 1040
                           compTSKEY, TD_LE);
  ASSERT(ptr2 != NULL);

  if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;

H
TD-100  
hzcheng 已提交
1041
  return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
H
TD-185  
Hongze Cheng 已提交
1042 1043
}

H
TD-353  
Hongze Cheng 已提交
1044 1045 1046
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
  pHelper->files.fid = -1;
H
TD-353  
Hongze Cheng 已提交
1047
  tfree(pHelper->files.headF.fname);
H
TD-353  
Hongze Cheng 已提交
1048
  pHelper->files.headF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1049
  tfree(pHelper->files.dataF.fname);
H
TD-353  
Hongze Cheng 已提交
1050
  pHelper->files.dataF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1051
  tfree(pHelper->files.lastF.fname);
H
TD-353  
Hongze Cheng 已提交
1052
  pHelper->files.lastF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1053
  tfree(pHelper->files.nHeadF.fname);
H
TD-353  
Hongze Cheng 已提交
1054
  pHelper->files.nHeadF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1055
  tfree(pHelper->files.nLastF.fname);
H
TD-353  
Hongze Cheng 已提交
1056 1057
  pHelper->files.nLastF.fd = -1;
}
H
TD-185  
Hongze Cheng 已提交
1058

H
TD-353  
Hongze Cheng 已提交
1059
static int tsdbInitHelperFile(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1060 1061
  STsdbCfg *pCfg = &pHelper->pRepo->config;
  size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1062
  pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
H
TD-353  
Hongze Cheng 已提交
1063 1064 1065 1066
  if (pHelper->pCompIdx == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1067 1068 1069

  tsdbResetHelperFileImpl(pHelper);
  return 0;
H
TD-185  
Hongze Cheng 已提交
1070 1071
}

H
TD-353  
Hongze Cheng 已提交
1072 1073
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
  tsdbCloseHelperFile(pHelper, false);
H
TD-353  
Hongze Cheng 已提交
1074
  tsdbResetHelperFileImpl(pHelper);
H
TD-353  
Hongze Cheng 已提交
1075 1076
  tzfree(pHelper->pCompIdx);
}
H
TD-185  
Hongze Cheng 已提交
1077

H
TD-353  
Hongze Cheng 已提交
1078 1079 1080 1081 1082
// ---------- Operations on Helper Table part
static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable));
  pHelper->hasOldLastBlock = false;
}
H
TD-185  
Hongze Cheng 已提交
1083

H
TD-353  
Hongze Cheng 已提交
1084 1085 1086 1087
static void tsdbResetHelperTable(SRWHelper *pHelper) {
  tsdbResetHelperBlock(pHelper);
  tsdbResetHelperTableImpl(pHelper);
  helperClearState(pHelper, (TSDB_HELPER_TABLE_SET | TSDB_HELPER_INFO_LOAD));
H
TD-185  
Hongze Cheng 已提交
1088 1089
}

H
TD-353  
Hongze Cheng 已提交
1090
static void tsdbInitHelperTable(SRWHelper *pHelper) { tsdbResetHelperTableImpl(pHelper); }
H
TD-185  
Hongze Cheng 已提交
1091

H
TD-353  
Hongze Cheng 已提交
1092
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); }
H
TD-185  
Hongze Cheng 已提交
1093

H
TD-353  
Hongze Cheng 已提交
1094 1095 1096 1097 1098
// ---------- Operations on Helper Block part
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
  tdResetDataCols(pHelper->pDataCols[0]);
  tdResetDataCols(pHelper->pDataCols[1]);
}
H
TD-185  
Hongze Cheng 已提交
1099

H
TD-353  
Hongze Cheng 已提交
1100 1101 1102 1103 1104 1105
static void tsdbResetHelperBlock(SRWHelper *pHelper) {
  tsdbResetHelperBlockImpl(pHelper);
  // helperClearState(pHelper, TSDB_HELPER_)
}

static int tsdbInitHelperBlock(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
  STsdbRepo *pRepo = helperRepo(pHelper);

  pHelper->pDataCols[0] =
      tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
  pHelper->pDataCols[1] =
      tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
  if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128

  tsdbResetHelperBlockImpl(pHelper);

  return 0;
}

static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
  tzfree(pHelper->pCompData);
  tdFreeDataCols(pHelper->pDataCols[0]);
  tdFreeDataCols(pHelper->pDataCols[1]);
}

static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
H
TD-353  
Hongze Cheng 已提交
1129
  STsdbCfg *pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
1130 1131
  memset((void *)pHelper, 0, sizeof(*pHelper));

H
TD-353  
Hongze Cheng 已提交
1132 1133 1134
  helperType(pHelper) = type;
  helperRepo(pHelper) = pRepo;
  helperState(pHelper) = TSDB_HELPER_CLEAR_STATE;
H
TD-353  
Hongze Cheng 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145

  // Init file part
  if (tsdbInitHelperFile(pHelper) < 0) goto _err;

  // Init table part
  tsdbInitHelperTable(pHelper);

  // Init block part
  if (tsdbInitHelperBlock(pHelper) < 0) goto _err;

  pHelper->pBuffer =
H
TD-353  
Hongze Cheng 已提交
1146 1147 1148 1149 1150 1151
      tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pRepo->imem->maxCols +
              pRepo->imem->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
  if (pHelper->pBuffer == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
TD-185  
Hongze Cheng 已提交
1152 1153

  return 0;
H
TD-353  
Hongze Cheng 已提交
1154 1155 1156 1157

_err:
  tsdbDestroyHelper(pHelper);
  return -1;
H
TD-185  
Hongze Cheng 已提交
1158 1159
}

H
TD-353  
Hongze Cheng 已提交
1160 1161 1162
static int comparColIdCompCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1163

H
TD-353  
Hongze Cheng 已提交
1164 1165 1166
static int comparColIdDataCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1167

H
TD-353  
Hongze Cheng 已提交
1168 1169 1170 1171
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
  if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
  if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
H
TD-185  
Hongze Cheng 已提交
1172

H
TD-353  
Hongze Cheng 已提交
1173
  return 0;
H
TD-185  
Hongze Cheng 已提交
1174 1175
}

H
TD-353  
Hongze Cheng 已提交
1176 1177 1178 1179
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                       SDataCols *pDataCols) {
  if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
H
TD-185  
Hongze Cheng 已提交
1180

H
TD-353  
Hongze Cheng 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  void *ptr = NULL;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t colId = colIds[i];

    ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol),
                  comparColIdCompCol);
    if (ptr == NULL) continue;
    SCompCol *pCompCol = (SCompCol *)ptr;

    ptr =
        bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
    ASSERT(ptr != NULL);
    SDataCol *pDataCol = (SDataCol *)ptr;

    pDataCol->len = pCompCol->len;
    if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
  }

  return 0;
}

static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
                                        int maxPoints, char *buffer, int bufferSize) {
  // Verify by checksum
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;

  // Decode the data
  if (comp) {
    // // Need to decompress
    pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
        content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  } else {
    // No need to decompress, just memcpy it
    pDataCol->len = len - sizeof(TSCKSUM);
    memcpy(pDataCol->pData, content, pDataCol->len);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  }
  return 0;
}

static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);

  ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);

  SCompData *pCompData = (SCompData *)pHelper->pBuffer;

H
TD-353  
Hongze Cheng 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
  SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);

  int fd = pFile->fd;
  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
  if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
    tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1248 1249 1250
  ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);

  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1251 1252 1253 1254 1255 1256
  if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
    tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
              pFile->fname, pCompBlock->offset, pCompBlock->len);
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280

  pDataCols->numOfRows = pCompBlock->numOfRows;

  // Recover the data
  int ccol = 0;
  int dcol = 0;
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (ccol >= pCompData->numOfCols) {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
      continue;
    }

    SCompCol *pCompCol = &(pCompData->cols[ccol]);

    if (pCompCol->colId == pDataCol->colId) {
      if (pCompBlock->algorithm == TWO_STAGE_COMP) {
        int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
        if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
          zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
        }
        pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
H
TD-353  
Hongze Cheng 已提交
1281 1282 1283 1284
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-353  
Hongze Cheng 已提交
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
      }
      if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
                                       pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
                                       pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
        goto _err;
      dcol++;
      ccol++;
    } else if (pCompCol->colId < pDataCol->colId) {
      ccol++;
    } else {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
    }
  }

  return 0;

_err:
  return -1;
}
H
TD-353  
Hongze Cheng 已提交
1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335

static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
  buf = taosEncodeVariantU32(buf, pIdx->len);
  buf = taosEncodeVariantU32(buf, pIdx->offset);
  buf = taosEncodeFixedU8(buf, pIdx->hasLast);
  buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  buf = taosEncodeFixedU64(buf, pIdx->uid);
  buf = taosEncodeFixedU64(buf, pIdx->maxKey);

  return buf;
}

static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->maxKey = (TSKEY)value;

  return buf;
}