tsdbRWHelper.c 49.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
slguan 已提交
15 16

#include "os.h"
H
TD-100  
hzcheng 已提交
17
#include "talgo.h"
H
TD-353  
Hongze Cheng 已提交
18
#include "tchecksum.h"
H
TD-185  
Hongze Cheng 已提交
19
#include "tcoding.h"
H
TD-353  
Hongze Cheng 已提交
20 21
#include "tscompression.h"
#include "tsdbMain.h"
H
hzcheng 已提交
22

H
TD-353  
Hongze Cheng 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
static bool  tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int   tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                  SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
static int   compareKeyBlock(const void *arg1, const void *arg2);
static int   tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int   compTSKEY(const void *key1, const void *key2);
static int   tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int   tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int   tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int   tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int   tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
static void  tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int   tsdbInitHelperFile(SRWHelper *pHelper);
static void  tsdbDestroyHelperFile(SRWHelper *pHelper);
static void  tsdbResetHelperTableImpl(SRWHelper *pHelper);
static void  tsdbResetHelperTable(SRWHelper *pHelper);
static void  tsdbInitHelperTable(SRWHelper *pHelper);
static void  tsdbDestroyHelperTable(SRWHelper *pHelper);
static void  tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void  tsdbResetHelperBlock(SRWHelper *pHelper);
static int   tsdbInitHelperBlock(SRWHelper *pHelper);
static int   tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int   comparColIdCompCol(const void *arg1, const void *arg2);
static int   comparColIdDataCol(const void *arg1, const void *arg2);
static int   tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf);
static int   tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                         SDataCols *pDataCols);
static int   tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
                                          int maxPoints, char *buffer, int bufferSize);
static int   tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
H
TD-353  
Hongze Cheng 已提交
55
static void  tsdbDestroyHelperBlock(SRWHelper *pHelper);
H
TD-353  
Hongze Cheng 已提交
56

H
TD-353  
Hongze Cheng 已提交
57
// ---------------------- INTERNAL FUNCTIONS ----------------------
H
TD-100  
hzcheng 已提交
58 59 60 61 62 63 64 65
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
}

int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
  return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER);
}

H
hzcheng 已提交
66
void tsdbDestroyHelper(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
67
  if (pHelper) {
H
Hongze Cheng 已提交
68
    tzfree(pHelper->pBuffer);
H
TD-166  
hzcheng 已提交
69
    tzfree(pHelper->compBuffer);
H
TD-100  
hzcheng 已提交
70 71 72 73 74
    tsdbDestroyHelperFile(pHelper);
    tsdbDestroyHelperTable(pHelper);
    tsdbDestroyHelperBlock(pHelper);
    memset((void *)pHelper, 0, sizeof(*pHelper));
  }
H
hzcheng 已提交
75 76
}

H
TD-100  
hzcheng 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90
void tsdbResetHelper(SRWHelper *pHelper) {
  if (pHelper) {
    // Reset the block part
    tsdbResetHelperBlockImpl(pHelper);

    // Reset the table part
    tsdbResetHelperTableImpl(pHelper);

    // Reset the file part
    tsdbCloseHelperFile(pHelper, false);
    tsdbResetHelperFileImpl(pHelper);

    pHelper->state = TSDB_HELPER_CLEAR_STATE;
  }
H
hzcheng 已提交
91 92
}

H
TD-100  
hzcheng 已提交
93 94
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
  ASSERT(pHelper != NULL && pGroup != NULL);
H
hzcheng 已提交
95

H
TD-100  
hzcheng 已提交
96
  // Clear the helper object
H
TD-100  
hzcheng 已提交
97
  tsdbResetHelper(pHelper);
H
TD-100  
hzcheng 已提交
98 99

  ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
H
hzcheng 已提交
100

H
TD-100  
hzcheng 已提交
101 102
  // Set the files
  pHelper->files.fid = pGroup->fileId;
H
TD-353  
Hongze Cheng 已提交
103 104 105
  tsdbCpySFile(&pHelper->files.headF, &pGroup->files[TSDB_FILE_TYPE_HEAD]);
  tsdbCpySFile(&pHelper->files.dataF, &pGroup->files[TSDB_FILE_TYPE_DATA]);
  tsdbCpySFile(&pHelper->files.lastF, &pGroup->files[TSDB_FILE_TYPE_LAST]);
H
TD-353  
Hongze Cheng 已提交
106
  if (helperType(pHelper) == TSDB_WRITE_HELPER) {
H
hzcheng 已提交
107

H
TD-353  
Hongze Cheng 已提交
108 109
    pHelper->files.nHeadF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD);
    pHelper->files.nLastF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST);
H
hzcheng 已提交
110 111
  }

H
TD-100  
hzcheng 已提交
112 113
  // Open the files
  if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
114
  if (helperType(pHelper) == TSDB_WRITE_HELPER) {
H
hzcheng 已提交
115 116
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
H
TD-100  
hzcheng 已提交
117 118

    // Create and open .h
H
TD-100  
hzcheng 已提交
119
    if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
H
TD-353  
Hongze Cheng 已提交
120
    // size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
121 122
    if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
      tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
H
TD-353  
Hongze Cheng 已提交
123
                TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
H
TD-353  
Hongze Cheng 已提交
124
      errno = TAOS_SYSTEM_ERROR(errno);
H
TD-185  
Hongze Cheng 已提交
125
      goto _err;
H
TD-353  
Hongze Cheng 已提交
126
    }
H
TD-100  
hzcheng 已提交
127 128

    // Create and open .l file if should
H
hzcheng 已提交
129 130
    if (tsdbShouldCreateNewLast(pHelper)) {
      if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
H
TD-185  
Hongze Cheng 已提交
131
      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
H
TD-353  
Hongze Cheng 已提交
132
        tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
H
TD-353  
Hongze Cheng 已提交
133
                  TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
H
TD-353  
Hongze Cheng 已提交
134 135
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
H
hzcheng 已提交
136 137 138 139 140 141
    }
  } else {
    if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
    if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
  }

H
TD-100  
hzcheng 已提交
142
  helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
143

H
TD-100  
hzcheng 已提交
144 145
  return tsdbLoadCompIdx(pHelper, NULL);

H
TD-353  
Hongze Cheng 已提交
146
_err:
H
hzcheng 已提交
147 148 149 150 151
  return -1;
}

int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
  if (pHelper->files.headF.fd > 0) {
H
Hongze Cheng 已提交
152
    fsync(pHelper->files.headF.fd);
H
hzcheng 已提交
153 154 155 156
    close(pHelper->files.headF.fd);
    pHelper->files.headF.fd = -1;
  }
  if (pHelper->files.dataF.fd > 0) {
H
Hongze Cheng 已提交
157
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
H
Hongze Cheng 已提交
158
    fsync(pHelper->files.dataF.fd);
H
hzcheng 已提交
159 160 161 162
    close(pHelper->files.dataF.fd);
    pHelper->files.dataF.fd = -1;
  }
  if (pHelper->files.lastF.fd > 0) {
H
Hongze Cheng 已提交
163
    fsync(pHelper->files.lastF.fd);
H
hzcheng 已提交
164 165 166 167
    close(pHelper->files.lastF.fd);
    pHelper->files.lastF.fd = -1;
  }
  if (pHelper->files.nHeadF.fd > 0) {
H
Hongze Cheng 已提交
168
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
H
Hongze Cheng 已提交
169
    fsync(pHelper->files.nHeadF.fd);
H
hzcheng 已提交
170 171
    close(pHelper->files.nHeadF.fd);
    pHelper->files.nHeadF.fd = -1;
H
TD-100  
hzcheng 已提交
172 173 174 175 176 177
    if (hasError) {
      remove(pHelper->files.nHeadF.fname);
    } else {
      rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname);
      pHelper->files.headF.info = pHelper->files.nHeadF.info;
    }
H
hzcheng 已提交
178
  }
H
TD-353  
Hongze Cheng 已提交
179

H
hzcheng 已提交
180
  if (pHelper->files.nLastF.fd > 0) {
H
Hongze Cheng 已提交
181
    if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
H
Hongze Cheng 已提交
182
    fsync(pHelper->files.nLastF.fd);
H
hzcheng 已提交
183 184
    close(pHelper->files.nLastF.fd);
    pHelper->files.nLastF.fd = -1;
H
TD-100  
hzcheng 已提交
185 186 187 188 189 190
    if (hasError) {
      remove(pHelper->files.nLastF.fname);
    } else {
      rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname);
      pHelper->files.lastF.info = pHelper->files.nLastF.info;
    }
H
hzcheng 已提交
191 192 193 194
  }
  return 0;
}

H
TD-100  
hzcheng 已提交
195
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
H
TD-100  
hzcheng 已提交
196
  ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
197 198

  // Clear members and state used by previous table
H
TD-100  
hzcheng 已提交
199
  tsdbResetHelperTable(pHelper);
H
TD-100  
hzcheng 已提交
200
  ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)));
H
hzcheng 已提交
201

H
TD-100  
hzcheng 已提交
202 203
  pHelper->tableInfo.tid = pTable->tableId.tid;
  pHelper->tableInfo.uid = pTable->tableId.uid;
H
TD-353  
Hongze Cheng 已提交
204
  STSchema *pSchema = tsdbGetTableSchema(pTable);
H
Hongze Cheng 已提交
205
  pHelper->tableInfo.sversion = schemaVersion(pSchema);
H
TD-100  
hzcheng 已提交
206

H
TD-100  
hzcheng 已提交
207 208 209
  tdInitDataCols(pHelper->pDataCols[0], pSchema);
  tdInitDataCols(pHelper->pDataCols[1], pSchema);

H
TD-100  
hzcheng 已提交
210
  SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
H
TD-100  
hzcheng 已提交
211 212 213 214
  if (pIdx->offset > 0 && pIdx->hasLast) {
    pHelper->hasOldLastBlock = true;
  }

H
TD-100  
hzcheng 已提交
215
  helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
H
TD-100  
hzcheng 已提交
216
  ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1));
H
hzcheng 已提交
217 218
}

H
TD-100  
hzcheng 已提交
219 220
/**
 * Write part of of points from pDataCols to file
H
TD-353  
Hongze Cheng 已提交
221
 *
H
TD-100  
hzcheng 已提交
222 223 224
 * @return: number of points written to file successfully
 *          -1 for failure
 */
H
hzcheng 已提交
225
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
H
TD-353  
Hongze Cheng 已提交
226
  ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
H
Haojun Liao 已提交
227
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
228

H
hzcheng 已提交
229 230 231 232
  SCompBlock compBlock;
  int        rowsToWrite = 0;
  TSKEY      keyFirst = dataColsKeyFirst(pDataCols);

H
TD-353  
Hongze Cheng 已提交
233 234
  STsdbCfg *pCfg = &pHelper->pRepo->config;

H
TD-100  
hzcheng 已提交
235
  ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
H
TD-100  
hzcheng 已提交
236
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;  // for change purpose
H
hzcheng 已提交
237 238

  // Load the SCompInfo part if neccessary
H
TD-100  
hzcheng 已提交
239
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
H
TD-100  
hzcheng 已提交
240
  if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
H
hzcheng 已提交
241

H
TD-100  
hzcheng 已提交
242
  if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) {  // Just append as a super block
H
TD-100  
hzcheng 已提交
243
    ASSERT(pHelper->hasOldLastBlock == false);
H
Haojun Liao 已提交
244
    rowsToWrite = pDataCols->numOfRows;
H
hzcheng 已提交
245 246 247
    SFile *pWFile = NULL;
    bool   isLast = false;

H
TD-353  
Hongze Cheng 已提交
248
    if (rowsToWrite >= pCfg->minRowsPerFileBlock) {
H
hzcheng 已提交
249 250 251
      pWFile = &(pHelper->files.dataF);
    } else {
      isLast = true;
H
TD-100  
hzcheng 已提交
252
      pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
253 254
    }

H
TD-100  
hzcheng 已提交
255
    if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
H
hzcheng 已提交
256

H
hzcheng 已提交
257
    if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
H
TD-100  
hzcheng 已提交
258
  } else {  // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
H
TD-353  
Hongze Cheng 已提交
259 260
    SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), pIdx->numOfBlocks,
                                         sizeof(SCompBlock), compareKeyBlock, TD_GE);
H
TD-100  
hzcheng 已提交
261

H
hzcheng 已提交
262
    int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
H
TD-100  
hzcheng 已提交
263 264

    if (pCompBlock == NULL) {  // No key overlap, must has last block, just merge with the last block
H
TD-166  
hzcheng 已提交
265
      ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
H
TD-100  
hzcheng 已提交
266 267 268 269
      rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
      if (rowsToWrite < 0) goto _err;
    } else {  // Has key overlap

H
TD-100  
hzcheng 已提交
270 271
      if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
        // Key overlap with the block, must merge with the block
H
TD-100  
hzcheng 已提交
272 273 274

        rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
        if (rowsToWrite < 0) goto _err;
H
TD-353  
Hongze Cheng 已提交
275 276
      } else {  // Save as a super block in the middle
        rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst - 1);
H
TD-100  
hzcheng 已提交
277
        ASSERT(rowsToWrite > 0);
H
TD-353  
Hongze Cheng 已提交
278 279
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0)
          goto _err;
H
TD-100  
hzcheng 已提交
280
        if (tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
281 282 283 284 285 286
      }
    }
  }

  return rowsToWrite;

H
TD-100  
hzcheng 已提交
287
_err:
H
hzcheng 已提交
288 289 290 291
  return -1;
}

int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
292 293 294
  STsdbCfg *pCfg = &pHelper->pRepo->config;

  ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
H
TD-353  
Hongze Cheng 已提交
295
  SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-100  
hzcheng 已提交
296
  SCompBlock compBlock;
H
TD-100  
hzcheng 已提交
297
  if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
H
TD-100  
hzcheng 已提交
298 299
    if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;

H
hzcheng 已提交
300
    SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
H
TD-100  
hzcheng 已提交
301 302 303
    ASSERT(pCompBlock->last);

    if (pCompBlock->numOfSubBlocks > 1) {
H
hzcheng 已提交
304
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
H
Haojun Liao 已提交
305
      ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
H
TD-353  
Hongze Cheng 已提交
306
             pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
H
TD-100  
hzcheng 已提交
307
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
H
Haojun Liao 已提交
308
                               pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
H
TD-100  
hzcheng 已提交
309 310
        return -1;

H
hzcheng 已提交
311
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
H
TD-100  
hzcheng 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324

    } else {
      if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
      pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
      if (pCompBlock->offset < 0) return -1;

      if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
        return -1;
    }

    pHelper->hasOldLastBlock = false;
  }

H
hzcheng 已提交
325 326 327 328
  return 0;
}

int tsdbWriteCompInfo(SRWHelper *pHelper) {
H
TD-100  
hzcheng 已提交
329 330 331 332 333
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
    if (pIdx->offset > 0) {
      pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
      if (pIdx->offset < 0) return -1;
334
      ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
335 336 337 338

      if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
    }
  } else {
H
TD-100  
hzcheng 已提交
339 340
    pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
    pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
H
hzcheng 已提交
341
    pHelper->pCompInfo->checksum = 0;
H
TD-100  
hzcheng 已提交
342
    ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
H
TD-100  
hzcheng 已提交
343
    taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
H
TD-100  
hzcheng 已提交
344
    pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
H
hzcheng 已提交
345
    pIdx->uid = pHelper->tableInfo.uid;
H
TD-100  
hzcheng 已提交
346
    if (pIdx->offset < 0) return -1;
H
Hongze Cheng 已提交
347
    ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
H
TD-100  
hzcheng 已提交
348 349 350 351

    if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
  }

H
hzcheng 已提交
352 353 354 355
  return 0;
}

int tsdbWriteCompIdx(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
356 357 358
  STsdbCfg *pCfg = &pHelper->pRepo->config;

  ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
H
TD-185  
Hongze Cheng 已提交
359 360 361 362 363 364
  off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
  if (offset < 0) return -1;

  SFile *pFile = &(pHelper->files.nHeadF);
  pFile->info.offset = offset;

H
Hongze Cheng 已提交
365
  void *buf = pHelper->pBuffer;
H
TD-353  
Hongze Cheng 已提交
366
  for (uint32_t i = 0; i < pCfg->maxTables; i++) {
H
TD-185  
Hongze Cheng 已提交
367 368
    SCompIdx *pCompIdx = pHelper->pCompIdx + i;
    if (pCompIdx->offset > 0) {
H
Hui Li 已提交
369 370
      int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
      if (tsizeof(pHelper->pBuffer) - drift < 128) {
H
TD-353  
Hongze Cheng 已提交
371
        pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
H
Hui Li 已提交
372 373
      }
      buf = POINTER_SHIFT(pHelper->pBuffer, drift);
H
Hongze Cheng 已提交
374
      buf = taosEncodeVariantU32(buf, i);
H
TD-185  
Hongze Cheng 已提交
375 376 377
      buf = tsdbEncodeSCompIdx(buf, pCompIdx);
    }
  }
H
TD-100  
hzcheng 已提交
378

H
Hongze Cheng 已提交
379 380
  int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
  taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
H
TD-100  
hzcheng 已提交
381

H
Hongze Cheng 已提交
382
  if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
H
TD-185  
Hongze Cheng 已提交
383
  pFile->info.len = tsize;
H
hzcheng 已提交
384 385 386 387
  return 0;
}

int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
H
TD-353  
Hongze Cheng 已提交
388 389
  STsdbCfg *pCfg = &(pHelper->pRepo->config);

H
TD-100  
hzcheng 已提交
390
  ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
H
hzcheng 已提交
391

H
TD-100  
hzcheng 已提交
392 393
  if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
    // If not load from file, just load it in object
H
TD-185  
Hongze Cheng 已提交
394
    SFile *pFile = &(pHelper->files.headF);
H
TD-353  
Hongze Cheng 已提交
395
    int    fd = pFile->fd;
H
TD-185  
Hongze Cheng 已提交
396 397 398 399 400

    memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
    if (pFile->info.offset > 0) {
      ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);

H
TD-353  
Hongze Cheng 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
      if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
        tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
      if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return -1;
      }
      if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
        tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
                  pFile->fname, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
Hongze Cheng 已提交
417
      if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
H
TD-353  
Hongze Cheng 已提交
418 419 420
        tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname,
                  pFile->info.offset, pFile->info.len);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
TD-185  
Hongze Cheng 已提交
421 422
        return -1;
      }
H
hzcheng 已提交
423

H
TD-185  
Hongze Cheng 已提交
424
      // Decode it
H
Hongze Cheng 已提交
425
      void *ptr = pHelper->pBuffer;
H
TD-353  
Hongze Cheng 已提交
426
      while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
H
TD-185  
Hongze Cheng 已提交
427
        uint32_t tid = 0;
H
Hongze Cheng 已提交
428
        if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
H
TD-353  
Hongze Cheng 已提交
429
        ASSERT(tid > 0 && tid < pCfg->maxTables);
H
TD-185  
Hongze Cheng 已提交
430 431 432

        if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;

H
TD-353  
Hongze Cheng 已提交
433
        ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM));
H
TD-185  
Hongze Cheng 已提交
434
      }
H
Hongze Cheng 已提交
435

H
TD-353  
Hongze Cheng 已提交
436 437 438 439
      if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return -1;
      }
H
TD-100  
hzcheng 已提交
440
    }
H
TD-100  
hzcheng 已提交
441 442
  }
  helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
H
hzcheng 已提交
443

H
TD-100  
hzcheng 已提交
444
  // Copy the memory for outside usage
H
TD-100  
hzcheng 已提交
445
  if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
H
hzcheng 已提交
446 447 448 449 450

  return 0;
}

int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
H
TD-100  
hzcheng 已提交
451 452
  ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));

H
TD-100  
hzcheng 已提交
453 454
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
hzcheng 已提交
455 456
  int fd = pHelper->files.headF.fd;

H
TD-100  
hzcheng 已提交
457
  if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
H
TD-100  
hzcheng 已提交
458 459
    if (pIdx->offset > 0) {
      if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
H
hzcheng 已提交
460

H
TD-100  
hzcheng 已提交
461 462 463 464
      pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
      if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
      if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1;
    }
H
hzcheng 已提交
465

H
TD-100  
hzcheng 已提交
466 467
    helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
  }
H
hzcheng 已提交
468

H
TD-100  
hzcheng 已提交
469
  if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
H
hzcheng 已提交
470 471 472 473

  return 0;
}

H
TD-353  
Hongze Cheng 已提交
474
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
H
TD-100  
hzcheng 已提交
475 476 477 478 479
  ASSERT(pCompBlock->numOfSubBlocks <= 1);
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;

  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;

H
TD-100  
hzcheng 已提交
480 481 482
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
  pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
  if (pHelper->pCompData == NULL) return -1;
H
TD-100  
hzcheng 已提交
483 484 485 486 487 488
  if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;

  ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);

  if (target) memcpy(target, pHelper->pCompData, tsize);

H
hzcheng 已提交
489 490 491
  return 0;
}

H
Hongze Cheng 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) {
  SCompData *pCompData = pHelper->pCompData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pCompData->numOfCols) {
      pStatis[i].numOfNull = -1;
      i++;
      continue;
    }

    if (pStatis[i].colId == pCompData->cols[j].colId) {
      pStatis[i].sum = pCompData->cols[j].sum;
      pStatis[i].max = pCompData->cols[j].max;
      pStatis[i].min = pCompData->cols[j].min;
      pStatis[i].maxIndex = pCompData->cols[j].maxIndex;
      pStatis[i].minIndex = pCompData->cols[j].minIndex;
      pStatis[i].numOfNull = pCompData->cols[j].numOfNull;
      i++;
      j++;
    } else if (pStatis[i].colId < pCompData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
      i++;
    } else {
      j++;
    }
  }
}

H
TD-100  
hzcheng 已提交
520 521 522
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
  SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-353  
Hongze Cheng 已提交
523
  ASSERT(pCompBlock->numOfSubBlocks >= 1);  // Must be super block
H
TD-100  
hzcheng 已提交
524

H
TD-353  
Hongze Cheng 已提交
525
  int         numOfSubBlocks = pCompBlock->numOfSubBlocks;
H
TD-100  
hzcheng 已提交
526 527
  SCompBlock *pStartBlock =
      (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
528

H
TD-100  
hzcheng 已提交
529 530 531 532
  if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1;
  for (int i = 1; i < numOfSubBlocks; i++) {
    pStartBlock++;
    if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
H
Haojun Liao 已提交
533
    tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
H
TD-100  
hzcheng 已提交
534
  }
H
TD-100  
hzcheng 已提交
535

H
hzcheng 已提交
536 537 538
  return 0;
}

H
TD-100  
hzcheng 已提交
539 540
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
H
TD-100  
hzcheng 已提交
541

H
TD-100  
hzcheng 已提交
542 543
  int numOfSubBlock = pCompBlock->numOfSubBlocks;
  if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
H
TD-100  
hzcheng 已提交
544

H
TD-100  
hzcheng 已提交
545
  tdResetDataCols(pHelper->pDataCols[0]);
H
TD-100  
hzcheng 已提交
546 547
  if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
  for (int i = 1; i < numOfSubBlock; i++) {
H
TD-100  
hzcheng 已提交
548
    tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
549 550
    pCompBlock++;
    if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
H
Haojun Liao 已提交
551
    if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
H
TD-100  
hzcheng 已提交
552
  }
H
TD-100  
hzcheng 已提交
553

H
TD-100  
hzcheng 已提交
554 555
  // if (target) TODO

H
hzcheng 已提交
556
  return 0;
H
TD-100  
hzcheng 已提交
557 558 559

_err:
  return -1;
H
hzcheng 已提交
560 561
}

H
TD-353  
Hongze Cheng 已提交
562 563 564 565 566 567 568 569 570 571 572
// ---------------------- INTERNAL FUNCTIONS ----------------------
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
  ASSERT(pHelper->files.lastF.fd > 0);
  struct stat st;
  fstat(pHelper->files.lastF.fd, &st);
  if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
  return false;
}

static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
                                SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
H
TD-353  
Hongze Cheng 已提交
573
  STsdbCfg *pCfg = &(pHelper->pRepo->config);
H
TD-353  
Hongze Cheng 已提交
574 575
  SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
  int64_t    offset = 0;
H
hzcheng 已提交
576

H
TD-353  
Hongze Cheng 已提交
577 578 579 580
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
  ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);


H
TD-100  
hzcheng 已提交
581
  offset = lseek(pFile->fd, 0, SEEK_END);
H
TD-353  
Hongze Cheng 已提交
582 583 584 585 586
  if (offset < 0) {
    tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
hzcheng 已提交
587 588 589 590 591 592

  int nColsNotAllNull = 0;
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
    SDataCol *pDataCol = pDataCols->cols + ncol;
    SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;

H
TD-353  
Hongze Cheng 已提交
593
    if (isNEleNull(pDataCol, rowsToWrite)) {  // all data to commit are NULL, just ignore it
H
hzcheng 已提交
594 595 596
      continue;
    }

H
Hongze Cheng 已提交
597 598
    memset(pCompCol, 0, sizeof(*pCompCol));

H
hzcheng 已提交
599 600
    pCompCol->colId = pDataCol->colId;
    pCompCol->type = pDataCol->type;
H
Hongze Cheng 已提交
601
    if (tDataTypeDesc[pDataCol->type].getStatisFunc && ncol != 0) {
H
TD-321  
Hongze Cheng 已提交
602 603 604 605
      (*tDataTypeDesc[pDataCol->type].getStatisFunc)(
          (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
          &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
    }
H
hzcheng 已提交
606 607 608
    nColsNotAllNull++;
  }

H
TD-100  
hzcheng 已提交
609
  ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols);
H
TD-100  
hzcheng 已提交
610

H
TD-166  
hzcheng 已提交
611 612 613 614 615
  // Compress the data if neccessary
  int     tcol = 0;
  int32_t toffset = 0;
  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM);
  int32_t lsize = tsize;
H
TD-100  
hzcheng 已提交
616
  for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
H
TD-166  
hzcheng 已提交
617
    if (tcol >= nColsNotAllNull) break;
H
TD-100  
hzcheng 已提交
618 619

    SDataCol *pDataCol = pDataCols->cols + ncol;
H
TD-166  
hzcheng 已提交
620 621 622 623 624 625 626
    SCompCol *pCompCol = pCompData->cols + tcol;

    if (pDataCol->colId != pCompCol->colId) continue;
    void *tptr = (void *)((char *)pCompData + lsize);

    pCompCol->offset = toffset;

H
TD-166  
hzcheng 已提交
627
    int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
H
TD-166  
hzcheng 已提交
628

H
TD-353  
Hongze Cheng 已提交
629 630
    if (pCfg->compression) {
      if (pCfg->compression == TWO_STAGE_COMP) {
H
TD-166  
hzcheng 已提交
631
        pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
H
TD-353  
Hongze Cheng 已提交
632 633 634 635
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-166  
hzcheng 已提交
636 637
      }

H
TD-166  
hzcheng 已提交
638
      pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
H
TD-353  
Hongze Cheng 已提交
639
          (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
H
TD-353  
Hongze Cheng 已提交
640
          pHelper->compBuffer, tsizeof(pHelper->compBuffer));
H
TD-166  
hzcheng 已提交
641 642
    } else {
      pCompCol->len = tlen;
H
TD-166  
hzcheng 已提交
643
      memcpy(tptr, pDataCol->pData, pCompCol->len);
H
TD-100  
hzcheng 已提交
644
    }
H
TD-166  
hzcheng 已提交
645 646 647 648 649 650 651 652

    // Add checksum
    pCompCol->len += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len);

    toffset += pCompCol->len;
    lsize += pCompCol->len;
    tcol++;
H
hzcheng 已提交
653 654
  }

H
TD-166  
hzcheng 已提交
655 656 657 658 659 660 661
  pCompData->delimiter = TSDB_FILE_DELIMITER;
  pCompData->uid = pHelper->tableInfo.uid;
  pCompData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);

  // Write the whole block to file
H
TD-353  
Hongze Cheng 已提交
662 663 664 665 666
  if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
    tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-166  
hzcheng 已提交
667 668

  // Update pCompBlock membership vairables
H
hzcheng 已提交
669 670
  pCompBlock->last = isLast;
  pCompBlock->offset = offset;
H
TD-353  
Hongze Cheng 已提交
671
  pCompBlock->algorithm = pCfg->compression;
H
Haojun Liao 已提交
672
  pCompBlock->numOfRows = rowsToWrite;
H
hzcheng 已提交
673
  pCompBlock->sversion = pHelper->tableInfo.sversion;
H
TD-166  
hzcheng 已提交
674
  pCompBlock->len = (int32_t)lsize;
H
TD-100  
hzcheng 已提交
675
  pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
H
hzcheng 已提交
676
  pCompBlock->numOfCols = nColsNotAllNull;
H
TD-100  
hzcheng 已提交
677 678
  pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
  pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
H
hzcheng 已提交
679

H
TD-353  
Hongze Cheng 已提交
680 681 682 683 684
  tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
            " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
            REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, pCompBlock->offset,
            pCompBlock->numOfRows, pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, pCompBlock->keyLast);

H
hzcheng 已提交
685 686
  return 0;

H
TD-353  
Hongze Cheng 已提交
687
_err:
H
hzcheng 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
  return -1;
}

static int compareKeyBlock(const void *arg1, const void *arg2) {
  TSKEY       key = *(TSKEY *)arg1;
  SCompBlock *pBlock = (SCompBlock *)arg2;

  if (key < pBlock->keyFirst) {
    return -1;
  } else if (key > pBlock->keyLast) {
    return 1;
  }

  return 0;
}

static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
H
TD-100  
hzcheng 已提交
705
  // TODO: set pHelper->hasOldBlock
H
hzcheng 已提交
706 707
  int        rowsWritten = 0;
  SCompBlock compBlock = {0};
H
TD-353  
Hongze Cheng 已提交
708
  STsdbCfg * pCfg = &pHelper->pRepo->config;
H
hzcheng 已提交
709

H
Haojun Liao 已提交
710
  ASSERT(pDataCols->numOfRows > 0);
H
TD-100  
hzcheng 已提交
711 712
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);

H
hzcheng 已提交
713
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
714
  ASSERT(blkIdx < pIdx->numOfBlocks);
H
hzcheng 已提交
715

H
TD-100  
hzcheng 已提交
716 717 718
  // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
  ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
H
TD-100  
hzcheng 已提交
719
  // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
H
hzcheng 已提交
720

H
TD-353  
Hongze Cheng 已提交
721
  if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) {  // Merge with the last block by append
H
TD-353  
Hongze Cheng 已提交
722
    ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock &&
H
TD-353  
Hongze Cheng 已提交
723
           blkIdx == pIdx->numOfBlocks - 1);
H
TD-353  
Hongze Cheng 已提交
724
    int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5;  // TODO: make a interface
H
TD-100  
hzcheng 已提交
725

H
Haojun Liao 已提交
726
    rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
H
TD-100  
hzcheng 已提交
727
    if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
H
TD-353  
Hongze Cheng 已提交
728
        (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pCfg->minRowsPerFileBlock) &&
H
Hongze Cheng 已提交
729
        (pHelper->files.nLastF.fd) < 0) {
H
TD-100  
hzcheng 已提交
730
      if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
H
TD-100  
hzcheng 已提交
731
        goto _err;
H
TD-100  
hzcheng 已提交
732
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
hzcheng 已提交
733
    } else {
H
TD-100  
hzcheng 已提交
734
      // Load
H
TD-100  
hzcheng 已提交
735
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
Hongze Cheng 已提交
736
      ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
H
TD-100  
hzcheng 已提交
737 738 739 740
      // Merge
      if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
      // Write
      SFile *pWFile = NULL;
H
TD-353  
Hongze Cheng 已提交
741
      bool   isLast = false;
H
TD-353  
Hongze Cheng 已提交
742
      if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
H
TD-100  
hzcheng 已提交
743
        pWFile = &(pHelper->files.dataF);
H
hzcheng 已提交
744
      } else {
H
TD-100  
hzcheng 已提交
745 746
        isLast = true;
        pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
H
hzcheng 已提交
747
      }
H
TD-353  
Hongze Cheng 已提交
748 749
      if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock,
                               isLast, true) < 0)
H
TD-100  
hzcheng 已提交
750 751
        goto _err;
      if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
H
hzcheng 已提交
752
    }
H
TD-100  
hzcheng 已提交
753 754 755

    ASSERT(pHelper->hasOldLastBlock);
    pHelper->hasOldLastBlock = false;
H
TD-100  
hzcheng 已提交
756
  } else {
H
TD-100  
hzcheng 已提交
757 758
    // Key must overlap with the block
    ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
H
TD-100  
hzcheng 已提交
759

H
Hongze Cheng 已提交
760
    TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
H
TD-100  
hzcheng 已提交
761

H
TD-100  
hzcheng 已提交
762
    // rows1: number of rows must merge in this block
H
TD-353  
Hongze Cheng 已提交
763 764
    int rows1 =
        tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
H
Hongze Cheng 已提交
765
    // rows2: max number of rows the block can have more
H
TD-353  
Hongze Cheng 已提交
766
    int rows2 = pCfg->maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
H
TD-100  
hzcheng 已提交
767 768
    // rows3: number of rows between this block and the next block
    int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
H
hzcheng 已提交
769

H
TD-100  
hzcheng 已提交
770
    ASSERT(rows3 >= rows1);
H
TD-100  
hzcheng 已提交
771

H
Hongze Cheng 已提交
772 773
    if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
        ((!blockAtIdx(pHelper, blkIdx)->last) ||
H
TD-353  
Hongze Cheng 已提交
774
         ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock) &&
H
Hongze Cheng 已提交
775
          (pHelper->files.nLastF.fd < 0)))) {
H
TD-100  
hzcheng 已提交
776 777 778 779
      rowsWritten = rows1;
      bool   isLast = false;
      SFile *pFile = NULL;

H
TD-100  
hzcheng 已提交
780
      if (blockAtIdx(pHelper, blkIdx)->last) {
H
TD-100  
hzcheng 已提交
781 782 783 784 785 786 787 788
        isLast = true;
        pFile = &(pHelper->files.lastF);
      } else {
        pFile = &(pHelper->files.dataF);
      }

      if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
      if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
H
Hongze Cheng 已提交
789
    } else {  // Load-Merge-Write
H
TD-100  
hzcheng 已提交
790
      // Load
H
TD-100  
hzcheng 已提交
791
      if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
H
TD-100  
hzcheng 已提交
792 793
      if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;

H
TD-100  
hzcheng 已提交
794 795
      rowsWritten = rows3;

H
TD-353  
Hongze Cheng 已提交
796 797
      int iter1 = 0;  // iter over pHelper->pDataCols[0]
      int iter2 = 0;  // iter over pDataCols
H
TD-100  
hzcheng 已提交
798 799
      int round = 0;
      // tdResetDataCols(pHelper->pDataCols[1]);
H
TD-100  
hzcheng 已提交
800
      while (true) {
H
Haojun Liao 已提交
801
        if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
H
TD-521  
Hongze Cheng 已提交
802
        tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
H
TD-353  
Hongze Cheng 已提交
803
                           pDataCols, &iter2, rowsWritten, pCfg->maxRowsPerFileBlock * 4 / 5);
H
Haojun Liao 已提交
804
        ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
H
TD-100  
hzcheng 已提交
805
        if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
H
Haojun Liao 已提交
806
                                 pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
H
TD-100  
hzcheng 已提交
807 808 809
          goto _err;
        if (round == 0) {
          tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
H
TD-100  
hzcheng 已提交
810 811 812
        } else {
          tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
        }
H
TD-100  
hzcheng 已提交
813 814
        round++;
        blkIdx++;
H
TD-100  
hzcheng 已提交
815 816
      }
    }
H
hzcheng 已提交
817 818 819 820
  }

  return rowsWritten;

H
TD-353  
Hongze Cheng 已提交
821
_err:
H
hzcheng 已提交
822 823 824
  return -1;
}

H
TD-100  
hzcheng 已提交
825 826 827 828 829 830 831 832 833
static int compTSKEY(const void *key1, const void *key2) {
  if (*(TSKEY *)key1 > *(TSKEY *)key2) {
    return 1;
  } else if (*(TSKEY *)key1 == *(TSKEY *)key2) {
    return 0;
  } else {
    return -1;
  }
}
H
hzcheng 已提交
834

H
TD-100  
hzcheng 已提交
835 836 837
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
  if (tsizeof((void *)pHelper->pCompInfo) <= esize) {
    size_t tsize = esize + sizeof(SCompBlock) * 16;
H
TD-100  
hzcheng 已提交
838
    pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize);
H
TD-100  
hzcheng 已提交
839
    if (pHelper->pCompInfo == NULL) return -1;
H
TD-100  
hzcheng 已提交
840 841
  }

H
TD-100  
hzcheng 已提交
842 843 844 845 846 847
  return 0;
}

static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
848
  ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
849 850 851
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  // Adjust memory if no more room
H
TD-100  
hzcheng 已提交
852 853
  if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM);
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
H
TD-100  
hzcheng 已提交
854 855

  // Change the offset
H
hzcheng 已提交
856
  for (int i = 0; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
857 858 859 860 861
    SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
    if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
  }

  // Memmove if needed
H
TD-100  
hzcheng 已提交
862
  int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx);
H
TD-100  
hzcheng 已提交
863
  if (tsize > 0) {
H
TD-100  
hzcheng 已提交
864 865 866 867
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo));
    ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo));
    memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)),
            (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize);
H
TD-100  
hzcheng 已提交
868 869 870
  }
  pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;

H
hzcheng 已提交
871
  pIdx->numOfBlocks++;
H
TD-100  
hzcheng 已提交
872
  pIdx->len += sizeof(SCompBlock);
H
TD-100  
hzcheng 已提交
873
  ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
H
hzcheng 已提交
874 875
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
876

H
hzcheng 已提交
877
  if (pIdx->numOfBlocks > 1) {
H
TD-100  
hzcheng 已提交
878 879 880
    ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
  }

H
TD-353  
Hongze Cheng 已提交
881 882 883
  tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
884 885
  return 0;

H
TD-100  
hzcheng 已提交
886
_err:
H
TD-100  
hzcheng 已提交
887 888 889
  return -1;
}

H
TD-100  
hzcheng 已提交
890 891 892 893
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
  ASSERT(pCompBlock->numOfSubBlocks == 0);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
H
TD-166  
hzcheng 已提交
894
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
895

H
TD-100  
hzcheng 已提交
896 897 898
  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
  ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);

H
TD-100  
hzcheng 已提交
899 900
  size_t spaceNeeded =
      (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock);
H
TD-353  
Hongze Cheng 已提交
901
  if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
H
TD-100  
hzcheng 已提交
902

H
TD-100  
hzcheng 已提交
903 904
  pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

H
TD-100  
hzcheng 已提交
905 906 907 908 909 910 911
  // Add the sub-block
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);

H
hzcheng 已提交
912
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
913 914 915 916 917 918 919 920
        SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
      }
    }

    *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;

    pSCompBlock->numOfSubBlocks++;
H
TD-100  
hzcheng 已提交
921
    ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
H
TD-100  
hzcheng 已提交
922
    pSCompBlock->len += sizeof(SCompBlock);
H
Haojun Liao 已提交
923
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
924 925
    pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
    pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
H
TD-100  
hzcheng 已提交
926 927 928
    pIdx->len += sizeof(SCompBlock);
  } else {  // Need to create two sub-blocks
    void *ptr = NULL;
H
hzcheng 已提交
929
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
930 931
      SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
      if (pTCompBlock->numOfSubBlocks > 1) {
H
Hongze Cheng 已提交
932
        ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
H
TD-100  
hzcheng 已提交
933 934 935 936
        break;
      }
    }

H
TD-353  
Hongze Cheng 已提交
937
    if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM));
H
TD-100  
hzcheng 已提交
938 939 940

    size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
    if (tsize > 0) {
H
Hongze Cheng 已提交
941
      memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
H
hzcheng 已提交
942
      for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
943 944 945 946 947 948 949 950 951 952 953
        SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
        if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
      }
    }

    ((SCompBlock *)ptr)[0] = *pSCompBlock;
    ((SCompBlock *)ptr)[0].numOfSubBlocks = 0;

    ((SCompBlock *)ptr)[1] = *pCompBlock;

    pSCompBlock->numOfSubBlocks = 2;
H
Haojun Liao 已提交
954
    pSCompBlock->numOfRows += rowsAdded;
H
TD-100  
hzcheng 已提交
955 956 957 958 959 960 961 962
    pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
    pSCompBlock->len = sizeof(SCompBlock) * 2;
    pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
    pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast);

    pIdx->len += (sizeof(SCompBlock) * 2);
  }

H
hzcheng 已提交
963 964
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
965

H
TD-353  
Hongze Cheng 已提交
966 967
  tsdbTrace("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);

H
TD-100  
hzcheng 已提交
968
  return 0;
H
TD-100  
hzcheng 已提交
969 970 971

_err:
  return -1;
H
TD-100  
hzcheng 已提交
972 973 974
}

static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
H
TD-100  
hzcheng 已提交
975 976 977 978
  ASSERT(pCompBlock->numOfSubBlocks == 1);

  SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;

H
TD-166  
hzcheng 已提交
979
  ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
H
TD-100  
hzcheng 已提交
980 981 982 983 984 985 986 987 988 989 990 991 992

  SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;

  ASSERT(pSCompBlock->numOfSubBlocks >= 1);

  // Delete the sub blocks it has
  if (pSCompBlock->numOfSubBlocks > 1) {
    size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
    if (tsize > 0) {
      memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
              (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
    }

H
hzcheng 已提交
993
    for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
H
TD-100  
hzcheng 已提交
994 995 996 997 998 999 1000 1001 1002
      SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
      if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
    }

    pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
  }

  *pSCompBlock = *pCompBlock;

H
hzcheng 已提交
1003 1004
  pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
  pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
H
TD-100  
hzcheng 已提交
1005

H
TD-353  
Hongze Cheng 已提交
1006 1007 1008
  tsdbTrace("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
            blkIdx);

H
TD-100  
hzcheng 已提交
1009
  return 0;
H
TD-100  
hzcheng 已提交
1010 1011 1012
}

// Get the number of rows in range [minKey, maxKey]
H
TD-100  
hzcheng 已提交
1013
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
H
Haojun Liao 已提交
1014
  if (pDataCols->numOfRows == 0) return 0;
H
TD-100  
hzcheng 已提交
1015 1016 1017 1018 1019 1020 1021 1022

  ASSERT(minKey <= maxKey);
  TSKEY keyFirst = dataColsKeyFirst(pDataCols);
  TSKEY keyLast = dataColsKeyLast(pDataCols);
  ASSERT(keyFirst <= keyLast);

  if (minKey > keyLast || maxKey < keyFirst) return 0;

H
Haojun Liao 已提交
1023
  void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1024 1025 1026
                           compTSKEY, TD_GE);
  ASSERT(ptr1 != NULL);

H
Haojun Liao 已提交
1027
  void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
H
TD-100  
hzcheng 已提交
1028 1029 1030 1031 1032
                           compTSKEY, TD_LE);
  ASSERT(ptr2 != NULL);

  if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;

H
TD-100  
hzcheng 已提交
1033
  return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1;
H
TD-185  
Hongze Cheng 已提交
1034 1035
}

H
TD-353  
Hongze Cheng 已提交
1036 1037 1038
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
  pHelper->files.fid = -1;
H
TD-353  
Hongze Cheng 已提交
1039
  tfree(pHelper->files.headF.fname);
H
TD-353  
Hongze Cheng 已提交
1040
  pHelper->files.headF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1041
  tfree(pHelper->files.dataF.fname);
H
TD-353  
Hongze Cheng 已提交
1042
  pHelper->files.dataF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1043
  tfree(pHelper->files.lastF.fname);
H
TD-353  
Hongze Cheng 已提交
1044
  pHelper->files.lastF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1045
  tfree(pHelper->files.nHeadF.fname);
H
TD-353  
Hongze Cheng 已提交
1046
  pHelper->files.nHeadF.fd = -1;
H
TD-353  
Hongze Cheng 已提交
1047
  tfree(pHelper->files.nLastF.fname);
H
TD-353  
Hongze Cheng 已提交
1048 1049
  pHelper->files.nLastF.fd = -1;
}
H
TD-185  
Hongze Cheng 已提交
1050

H
TD-353  
Hongze Cheng 已提交
1051
static int tsdbInitHelperFile(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1052 1053
  STsdbCfg *pCfg = &pHelper->pRepo->config;
  size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1054
  pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
H
TD-353  
Hongze Cheng 已提交
1055 1056 1057 1058
  if (pHelper->pCompIdx == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1059 1060 1061

  tsdbResetHelperFileImpl(pHelper);
  return 0;
H
TD-185  
Hongze Cheng 已提交
1062 1063
}

H
TD-353  
Hongze Cheng 已提交
1064 1065
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
  tsdbCloseHelperFile(pHelper, false);
H
TD-353  
Hongze Cheng 已提交
1066
  tsdbResetHelperFileImpl(pHelper);
H
TD-353  
Hongze Cheng 已提交
1067 1068
  tzfree(pHelper->pCompIdx);
}
H
TD-185  
Hongze Cheng 已提交
1069

H
TD-353  
Hongze Cheng 已提交
1070 1071 1072 1073 1074
// ---------- Operations on Helper Table part
static void tsdbResetHelperTableImpl(SRWHelper *pHelper) {
  memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable));
  pHelper->hasOldLastBlock = false;
}
H
TD-185  
Hongze Cheng 已提交
1075

H
TD-353  
Hongze Cheng 已提交
1076 1077 1078 1079
static void tsdbResetHelperTable(SRWHelper *pHelper) {
  tsdbResetHelperBlock(pHelper);
  tsdbResetHelperTableImpl(pHelper);
  helperClearState(pHelper, (TSDB_HELPER_TABLE_SET | TSDB_HELPER_INFO_LOAD));
H
TD-185  
Hongze Cheng 已提交
1080 1081
}

H
TD-353  
Hongze Cheng 已提交
1082
static void tsdbInitHelperTable(SRWHelper *pHelper) { tsdbResetHelperTableImpl(pHelper); }
H
TD-185  
Hongze Cheng 已提交
1083

H
TD-353  
Hongze Cheng 已提交
1084
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); }
H
TD-185  
Hongze Cheng 已提交
1085

H
TD-353  
Hongze Cheng 已提交
1086 1087 1088 1089 1090
// ---------- Operations on Helper Block part
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
  tdResetDataCols(pHelper->pDataCols[0]);
  tdResetDataCols(pHelper->pDataCols[1]);
}
H
TD-185  
Hongze Cheng 已提交
1091

H
TD-353  
Hongze Cheng 已提交
1092 1093 1094 1095 1096 1097
static void tsdbResetHelperBlock(SRWHelper *pHelper) {
  tsdbResetHelperBlockImpl(pHelper);
  // helperClearState(pHelper, TSDB_HELPER_)
}

static int tsdbInitHelperBlock(SRWHelper *pHelper) {
H
TD-353  
Hongze Cheng 已提交
1098
  STsdbRepo *pRepo = helperRepo(pHelper);
H
TD-353  
Hongze Cheng 已提交
1099
  STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
1100 1101

  pHelper->pDataCols[0] =
H
TD-353  
Hongze Cheng 已提交
1102
      tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
H
TD-353  
Hongze Cheng 已提交
1103
  pHelper->pDataCols[1] =
H
TD-353  
Hongze Cheng 已提交
1104
      tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
H
TD-353  
Hongze Cheng 已提交
1105 1106 1107 1108
  if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
TD-353  
Hongze Cheng 已提交
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121

  tsdbResetHelperBlockImpl(pHelper);

  return 0;
}

static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
  tzfree(pHelper->pCompData);
  tdFreeDataCols(pHelper->pDataCols[0]);
  tdFreeDataCols(pHelper->pDataCols[1]);
}

static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
H
TD-353  
Hongze Cheng 已提交
1122
  STsdbCfg *pCfg = &pRepo->config;
H
TD-353  
Hongze Cheng 已提交
1123
  memset((void *)pHelper, 0, sizeof(*pHelper));
H
TD-353  
Hongze Cheng 已提交
1124
  STsdbMeta *pMeta = pRepo->tsdbMeta;
H
TD-353  
Hongze Cheng 已提交
1125

H
TD-353  
Hongze Cheng 已提交
1126 1127 1128
  helperType(pHelper) = type;
  helperRepo(pHelper) = pRepo;
  helperState(pHelper) = TSDB_HELPER_CLEAR_STATE;
H
TD-353  
Hongze Cheng 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139

  // Init file part
  if (tsdbInitHelperFile(pHelper) < 0) goto _err;

  // Init table part
  tsdbInitHelperTable(pHelper);

  // Init block part
  if (tsdbInitHelperBlock(pHelper) < 0) goto _err;

  pHelper->pBuffer =
H
TD-353  
Hongze Cheng 已提交
1140 1141
      tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols +
              pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
H
TD-353  
Hongze Cheng 已提交
1142 1143 1144 1145
  if (pHelper->pBuffer == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
TD-185  
Hongze Cheng 已提交
1146 1147

  return 0;
H
TD-353  
Hongze Cheng 已提交
1148 1149 1150 1151

_err:
  tsdbDestroyHelper(pHelper);
  return -1;
H
TD-185  
Hongze Cheng 已提交
1152 1153
}

H
TD-353  
Hongze Cheng 已提交
1154 1155 1156
static int comparColIdCompCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1157

H
TD-353  
Hongze Cheng 已提交
1158 1159 1160
static int comparColIdDataCol(const void *arg1, const void *arg2) {
  return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}
H
TD-185  
Hongze Cheng 已提交
1161

H
TD-353  
Hongze Cheng 已提交
1162 1163 1164 1165
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
  size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
  if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
  if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
H
TD-185  
Hongze Cheng 已提交
1166

H
TD-353  
Hongze Cheng 已提交
1167
  return 0;
H
TD-185  
Hongze Cheng 已提交
1168 1169
}

H
TD-353  
Hongze Cheng 已提交
1170 1171 1172 1173
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
                                       SDataCols *pDataCols) {
  if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
  int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
H
TD-185  
Hongze Cheng 已提交
1174

H
TD-353  
Hongze Cheng 已提交
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
  void *ptr = NULL;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t colId = colIds[i];

    ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol),
                  comparColIdCompCol);
    if (ptr == NULL) continue;
    SCompCol *pCompCol = (SCompCol *)ptr;

    ptr =
        bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
    ASSERT(ptr != NULL);
    SDataCol *pDataCol = (SDataCol *)ptr;

    pDataCol->len = pCompCol->len;
    if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
  }

  return 0;
}

static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
                                        int maxPoints, char *buffer, int bufferSize) {
  // Verify by checksum
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;

  // Decode the data
  if (comp) {
    // // Need to decompress
    pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
        content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  } else {
    // No need to decompress, just memcpy it
    pDataCol->len = len - sizeof(TSCKSUM);
    memcpy(pDataCol->pData, content, pDataCol->len);
    if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
      dataColSetOffset(pDataCol, numOfRows);
    }
  }
  return 0;
}

static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
  ASSERT(pCompBlock->numOfSubBlocks <= 1);

  ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);

  SCompData *pCompData = (SCompData *)pHelper->pBuffer;

H
TD-353  
Hongze Cheng 已提交
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
  SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);

  int fd = pFile->fd;
  if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
  if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
    tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
              pFile->fname, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1242 1243 1244
  ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);

  int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
H
TD-353  
Hongze Cheng 已提交
1245 1246 1247 1248 1249 1250
  if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
    tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
              pFile->fname, pCompBlock->offset, pCompBlock->len);
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    goto _err;
  }
H
TD-353  
Hongze Cheng 已提交
1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274

  pDataCols->numOfRows = pCompBlock->numOfRows;

  // Recover the data
  int ccol = 0;
  int dcol = 0;
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (ccol >= pCompData->numOfCols) {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
      continue;
    }

    SCompCol *pCompCol = &(pCompData->cols[ccol]);

    if (pCompCol->colId == pDataCol->colId) {
      if (pCompBlock->algorithm == TWO_STAGE_COMP) {
        int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
        if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
          zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
        }
        pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
H
TD-353  
Hongze Cheng 已提交
1275 1276 1277 1278
        if (pHelper->compBuffer == NULL) {
          terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
          goto _err;
        }
H
TD-353  
Hongze Cheng 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
      }
      if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
                                       pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
                                       pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
        goto _err;
      dcol++;
      ccol++;
    } else if (pCompCol->colId < pDataCol->colId) {
      ccol++;
    } else {
      // Set current column as NULL and forward
      dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
      dcol++;
    }
  }

  return 0;

_err:
  return -1;
}
H
TD-353  
Hongze Cheng 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329

static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
  buf = taosEncodeVariantU32(buf, pIdx->len);
  buf = taosEncodeVariantU32(buf, pIdx->offset);
  buf = taosEncodeFixedU8(buf, pIdx->hasLast);
  buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  buf = taosEncodeFixedU64(buf, pIdx->uid);
  buf = taosEncodeFixedU64(buf, pIdx->maxKey);

  return buf;
}

static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->maxKey = (TSKEY)value;

  return buf;
}