ttszip.c 33.0 KB
Newer Older
S
common  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17 18 19 20 21
#include "ttszip.h"
#include "taoserror.h"
#include "tcompression.h"

static int32_t getDataStartOffset();
S
Shengliang Guan 已提交
22
static void    TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);

/**
 * todo error handling
 * support auto closeable tmp file
 * @param path
 * @return
 */
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
  STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
39

S
os env  
Shengliang Guan 已提交
40
  taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
41 42 43
  // pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
  if (pTSBuf->pFile == NULL) {
44 45 46 47 48 49 50
    free(pTSBuf);
    return NULL;
  }

  if (!autoDelete) {
    remove(pTSBuf->path);
  }
S
Shengliang Guan 已提交
51

52 53 54
  if (NULL == allocResForTSBuf(pTSBuf)) {
    return NULL;
  }
S
Shengliang Guan 已提交
55

56 57 58
  // update the header info
  STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
  STSBufUpdateHeader(pTSBuf, &header);
S
Shengliang Guan 已提交
59

60 61 62 63
  tsBufResetPos(pTSBuf);
  pTSBuf->cur.order = TSDB_ORDER_ASC;

  pTSBuf->tsOrder = order;
S
Shengliang Guan 已提交
64

65 66 67 68 69 70 71 72 73 74
  return pTSBuf;
}

STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
  STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
75

76
  tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
S
Shengliang Guan 已提交
77

78 79 80
  // pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pTSBuf->pFile == NULL) {
81 82 83
    free(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
84

85 86 87
  if (allocResForTSBuf(pTSBuf) == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
88

89 90
  // validate the file magic number
  STSBufFileHeader header = {0};
S
Shengliang Guan 已提交
91
  int32_t          ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
92
  UNUSED(ret);
93
  size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader));
94 95 96 97 98 99 100
  UNUSED(sz);

  // invalid file
  if (header.magic != TS_COMP_FILE_MAGIC) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
101

102 103 104 105 106 107 108
  if (header.numOfGroup > pTSBuf->numOfAlloc) {
    pTSBuf->numOfAlloc = header.numOfGroup;
    STSGroupBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
    if (tmp == NULL) {
      tsBufDestroy(pTSBuf);
      return NULL;
    }
S
Shengliang Guan 已提交
109

110 111
    pTSBuf->pData = tmp;
  }
S
Shengliang Guan 已提交
112

113
  pTSBuf->numOfGroups = header.numOfGroup;
S
Shengliang Guan 已提交
114

115 116 117
  // check the ts order
  pTSBuf->tsOrder = header.tsOrder;
  if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
S
Shengliang Guan 已提交
118
    //    tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
119 120 121
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
122

123
  size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
S
Shengliang Guan 已提交
124

125 126 127
  STSGroupBlockInfo* buf = (STSGroupBlockInfo*)calloc(1, infoSize);
  if (buf == NULL) {
    tsBufDestroy(pTSBuf);
S
Shengliang Guan 已提交
128 129 130 131
    return NULL;
  }

  // int64_t pos = ftell(pTSBuf->pFile); //pos not used
132
  sz = taosReadFile(pTSBuf->pFile, buf, infoSize);
133
  UNUSED(sz);
S
Shengliang Guan 已提交
134

135 136 137 138 139 140
  // the length value for each vnode is not kept in file, so does not set the length value
  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
    memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
  }
  free(buf);
S
Shengliang Guan 已提交
141

142
  ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END);
143
  UNUSED(ret);
S
Shengliang Guan 已提交
144

145 146
  int64_t file_size;
  if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) {
147 148 149
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
150

151
  pTSBuf->fileSize = (uint32_t)file_size;
152
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
153

154 155
  // ascending by default
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
156 157 158 159 160

  //  tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
  //  fileno(pTSBuf->pFile),
  //           pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);

161 162 163 164 165 166 167
  return pTSBuf;
}

void* tsBufDestroy(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
168

169 170
  tfree(pTSBuf->assistBuf);
  tfree(pTSBuf->tsData.rawBuf);
S
Shengliang Guan 已提交
171

172 173 174 175
  tfree(pTSBuf->pData);
  tfree(pTSBuf->block.payload);

  if (!pTSBuf->remainOpen) {
176
    taosCloseFile(&pTSBuf->pFile);
177
  }
S
Shengliang Guan 已提交
178

179
  if (pTSBuf->autoDelete) {
S
Shengliang Guan 已提交
180
    //    ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
181 182
    remove(pTSBuf->path);
  } else {
S
Shengliang Guan 已提交
183
    //    tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
184 185 186 187 188 189 190 191 192
  }

  taosVariantDestroy(&pTSBuf->block.tag);
  free(pTSBuf);
  return NULL;
}

static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
  int32_t last = pTSBuf->numOfGroups - 1;
S
Shengliang Guan 已提交
193

194 195 196 197 198 199 200 201
  assert(last >= 0);
  return &pTSBuf->pData[last];
}

static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
  if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
    uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
    assert((int32_t)newSize > pTSBuf->numOfAlloc);
S
Shengliang Guan 已提交
202

203 204 205 206
    STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
    if (tmp == NULL) {
      return NULL;
    }
S
Shengliang Guan 已提交
207

208 209 210 211
    pTSBuf->pData = tmp;
    pTSBuf->numOfAlloc = newSize;
    memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
  }
S
Shengliang Guan 已提交
212

213 214
  if (pTSBuf->numOfGroups > 0) {
    STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
215

216 217 218
    // update prev vnode length info in file
    TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
  }
S
Shengliang Guan 已提交
219

220 221 222 223 224
  // set initial value for vnode block
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
  pBlockInfo->id = id;
  pBlockInfo->offset = pTSBuf->fileSize;
  assert(pBlockInfo->offset >= getDataStartOffset());
S
Shengliang Guan 已提交
225

226 227
  // update vnode info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
S
Shengliang Guan 已提交
228

229 230
  // add one vnode info
  pTSBuf->numOfGroups += 1;
S
Shengliang Guan 已提交
231

232 233 234
  // update the header info
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
S
Shengliang Guan 已提交
235

236 237 238 239 240 241 242 243
  STSBufUpdateHeader(pTSBuf, &header);
  return tsBufGetLastGroupInfo(pTSBuf);
}

static void shrinkBuffer(STSList* ptsData) {
  // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
  if (ptsData->allocSize >= ptsData->threshold * 2) {
    char* rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE);
S
Shengliang Guan 已提交
244
    if (rawBuf) {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
      ptsData->rawBuf = rawBuf;
      ptsData->allocSize = MEM_BUF_SIZE;
    }
  }
}

static int32_t getTagAreaLength(SVariant* pa) {
  int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType);
  if (pa->nType != TSDB_DATA_TYPE_NULL) {
    t += pa->nLen;
  }

  return t;
}

static void writeDataToDisk(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len == 0) {
    return;
  }
S
Shengliang Guan 已提交
264

265 266 267 268
  STSBlock* pBlock = &pTSBuf->block;
  STSList*  pTsData = &pTSBuf->tsData;

  pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
269 270 271
  pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload,
                                        pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);

272
  int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET);
273
  assert(r == 0);
S
Shengliang Guan 已提交
274

275 276 277 278 279 280 281 282
  /*
   * format for output data:
   * 1. tags, number of ts, size after compressed, payload, size after compressed
   * 2. tags, number of ts, size after compressed, payload, size after compressed
   *
   * both side has the compressed length is used to support load data forwards/backwords.
   */
  int32_t metaLen = 0;
283
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
284 285 286

  int32_t trueLen = pBlock->tag.nLen;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
287 288
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
289
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
290
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
291
    float tfloat = (float)pBlock->tag.d;
S
Shengliang Guan 已提交
292
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
293
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
294
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
295
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
296 297
  } else {
    trueLen = 0;
298
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
299 300
  }

301 302 303 304
  taosWriteFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
  taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
305

S
Shengliang Guan 已提交
306
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
307 308 309 310
  assert(metaLen == getTagAreaLength(&pBlock->tag));

  int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
  pTSBuf->fileSize += blockSize;
S
Shengliang Guan 已提交
311

312
  pTSBuf->tsData.len = 0;
S
Shengliang Guan 已提交
313

314
  STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
315

316 317
  pGroupBlockInfoEx->info.compLen += blockSize;
  pGroupBlockInfoEx->info.numOfBlocks += 1;
S
Shengliang Guan 已提交
318

319 320 321 322 323 324 325 326 327 328
  shrinkBuffer(&pTSBuf->tsData);
}

static void expandBuffer(STSList* ptsData, int32_t inputSize) {
  if (ptsData->allocSize - ptsData->len < inputSize) {
    int32_t newSize = inputSize + ptsData->len;
    char*   tmp = realloc(ptsData->rawBuf, (size_t)newSize);
    if (tmp == NULL) {
      // todo
    }
S
Shengliang Guan 已提交
329

330 331 332 333 334 335 336 337 338
    ptsData->rawBuf = tmp;
    ptsData->allocSize = newSize;
  }
}

STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
  STSBlock* pBlock = &pTSBuf->block;

  // clear the memory buffer
S
Shengliang Guan 已提交
339 340
  pBlock->compLen = 0;
  pBlock->padding = 0;
341 342 343 344 345 346 347 348 349
  pBlock->numOfElem = 0;

  int32_t offset = -1;

  if (order == TSDB_ORDER_DESC) {
    /*
     * set the right position for the reversed traverse, the reversed traverse is started from
     * the end of each comp data block
     */
S
Shengliang Guan 已提交
350
    int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
351
    int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR);
S
Shengliang Guan 已提交
352
    size_t  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
353
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
354
    UNUSED(sz);
355 356 357 358

    pBlock->compLen = pBlock->padding;

    offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
359
    ret = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
360 361 362
    UNUSED(ret);
  }

363 364
  int32_t ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
  ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
365 366 367 368 369 370 371 372 373 374

  // NOTE: mix types tags are not supported
  size_t sz = 0;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
    char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
    assert(tp != NULL);

    memset(tp, 0, pBlock->tag.nLen + 1);
    pBlock->tag.pz = tp;

375
    sz = taosReadFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
376 377 378
    UNUSED(sz);
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
    float tfloat = 0;
S
Shengliang Guan 已提交
379
    sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
380 381
    pBlock->tag.d = (double)tfloat;
    UNUSED(sz);
S
Shengliang Guan 已提交
382 383
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {  // TODO check the return value
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
384 385 386
    UNUSED(sz);
  }

387
  sz = taosReadFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
388
  UNUSED(sz);
389
  sz = taosReadFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
390
  UNUSED(sz);
391
  sz = taosReadFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
392 393 394 395 396 397

  if (decomp) {
    pTSBuf->tsData.len =
        tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                              pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
  }
S
Shengliang Guan 已提交
398

399
  // read the comp length at the length of comp block
400
  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
401 402 403
  assert(pBlock->padding == pBlock->compLen);

  int32_t n = 0;
404
  sz = taosReadFile(pTSBuf->pFile, &n, sizeof(pBlock->tag.nLen));
405 406 407 408 409 410 411
  if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) {
    assert(n == 0);
  } else {
    assert(n == pBlock->tag.nLen);
  }

  UNUSED(sz);
S
Shengliang Guan 已提交
412

413 414
  // for backwards traverse, set the start position at the end of previous block
  if (order == TSDB_ORDER_DESC) {
415
    int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
416 417
    UNUSED(r);
  }
S
Shengliang Guan 已提交
418

419 420 421 422 423 424
  return pBlock;
}

// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
  STSList* ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
425

426 427 428
  if (pTSBuf->tsOrder == -1) {
    if (ptsData->len > 0) {
      TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
S
Shengliang Guan 已提交
429

430 431 432 433 434 435 436 437 438
      if (lastKey > *(TSKEY*)pData) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      }
    } else if (len > TSDB_KEYSIZE) {
      // no data in current vnode, more than one ts is added, check the orders
      TSKEY k1 = *(TSKEY*)(pData);
      TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
S
Shengliang Guan 已提交
439

440 441 442 443 444 445 446 447 448 449 450
      if (k1 < k2) {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      } else if (k1 > k2) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        // todo handle error
      }
    }
  } else {
    // todo the timestamp order is set, check the asc/desc order of appended data
  }
S
Shengliang Guan 已提交
451

452 453 454 455 456 457
  return TSDB_CODE_SUCCESS;
}

void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) {
  STSGroupBlockInfoEx* pBlockInfo = NULL;
  STSList*             ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
458

459 460 461
  if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
S
Shengliang Guan 已提交
462

463 464 465 466
    pBlockInfo = addOneGroupInfo(pTSBuf, id);
  } else {
    pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
  }
S
Shengliang Guan 已提交
467

468 469 470 471 472 473 474 475 476 477 478
  assert(pBlockInfo->info.id == id);

  if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
    // new arrived data with different tags value, save current value into disk first
    writeDataToDisk(pTSBuf);
  } else {
    expandBuffer(ptsData, len);
  }

  taosVariantAssign(&pTSBuf->block.tag, tag);
  memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
S
Shengliang Guan 已提交
479

480 481
  // todo check return value
  setCheckTSOrder(pTSBuf, pData, len);
S
Shengliang Guan 已提交
482

483 484
  ptsData->len += len;
  pBlockInfo->len += len;
S
Shengliang Guan 已提交
485

486
  pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
487

488 489 490 491 492 493
  // the size of raw data exceeds the size of the default prepared buffer, so
  // during getBufBlock, the output buffer needs to be large enough.
  if (ptsData->len >= ptsData->threshold) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
  }
S
Shengliang Guan 已提交
494

495 496 497 498 499 500 501
  tsBufResetPos(pTSBuf);
}

void tsBufFlush(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len <= 0) {
    return;
  }
S
Shengliang Guan 已提交
502

503 504
  writeDataToDisk(pTSBuf);
  shrinkBuffer(&pTSBuf->tsData);
S
Shengliang Guan 已提交
505

506
  STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
507

508 509
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
S
Shengliang Guan 已提交
510

511 512 513 514 515 516 517 518 519 520 521 522 523 524
  // save the ts order into header
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  STSBufUpdateHeader(pTSBuf, &header);
}

static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
  int32_t j = -1;
  for (int32_t i = 0; i < numOfGroups; ++i) {
    if (pGroupInfoEx[i].info.id == id) {
      j = i;
      break;
    }
  }
S
Shengliang Guan 已提交
525

526 527 528 529 530
  return j;
}

// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
531
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
532 533
    return -1;
  }
S
Shengliang Guan 已提交
534

535 536 537
  // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
  int32_t i = 0;
  bool    decomp = false;
S
Shengliang Guan 已提交
538

539 540 541 542 543
  while ((i++) <= blockIndex) {
    if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) {
      return -1;
    }
  }
S
Shengliang Guan 已提交
544

545 546 547 548 549
  // set the file position to be the end of previous comp block
  if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
    STSBlock* pBlock = &pTSBuf->block;
    int32_t   compBlockSize =
        pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
550
    int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR);
551 552
    UNUSED(ret);
  }
S
Shengliang Guan 已提交
553

554 555 556 557 558
  return 0;
}

static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) {
  bool decomp = false;
S
Shengliang Guan 已提交
559

560 561 562 563 564 565
  int64_t offset = 0;
  if (pTSBuf->cur.order == TSDB_ORDER_ASC) {
    offset = pBlockInfo->offset;
  } else {  // reversed traverse starts from the end of block
    offset = pBlockInfo->offset + pBlockInfo->compLen;
  }
S
Shengliang Guan 已提交
566

567
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
568 569
    return -1;
  }
S
Shengliang Guan 已提交
570

571 572 573 574
  for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
    if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
      return -1;
    }
S
Shengliang Guan 已提交
575

576
    if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) {
S
Shengliang Guan 已提交
577
      return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1));
578 579
    }
  }
S
Shengliang Guan 已提交
580

581 582 583 584 585 586 587 588
  return -1;
}

static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
  if (pBlockInfo->numOfBlocks <= blockIndex) {
    assert(false);
  }
S
Shengliang Guan 已提交
589

590 591
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
S
Shengliang Guan 已提交
592
                                          (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
593 594 595
    int32_t i = 0;
    bool    decomp = false;
    int32_t step = abs(blockIndex - pCur->blockIndex);
S
Shengliang Guan 已提交
596

597 598 599 600 601 602 603 604 605 606
    while ((++i) <= step) {
      if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
        return;
      }
    }
  } else {
    if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
      assert(false);
    }
  }
S
Shengliang Guan 已提交
607

608
  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
609

610
  size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
S
Shengliang Guan 已提交
611

612 613 614 615 616 617 618
  /*
   * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
   * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
   */
  if (s > pTSBuf->tsData.allocSize) {
    expandBuffer(&pTSBuf->tsData, (int32_t)s);
  }
S
Shengliang Guan 已提交
619

620 621 622
  pTSBuf->tsData.len =
      tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                            pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
S
Shengliang Guan 已提交
623

624
  assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
S
Shengliang Guan 已提交
625

626 627
  pCur->vgroupIndex = groupIndex;
  pCur->blockIndex = blockIndex;
S
Shengliang Guan 已提交
628

629 630 631 632 633 634 635 636
  pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
}

static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
  if (offset < 0 || offset >= getDataStartOffset()) {
    return -1;
  }

637
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
638 639 640
    return -1;
  }

641
  taosWriteFile(pTSBuf->pFile, pVInfo, sizeof(STSGroupBlockInfo));
642 643 644 645 646 647 648 649
  return 0;
}

STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return NULL;
  }
S
Shengliang Guan 已提交
650

651 652 653 654
  return &pTSBuf->pData[j].info;
}

int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
655
  if ((pTSBuf->pFile == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
656 657 658 659 660
    return -1;
  }

  assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);

661
  int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
662
  if (r != 0) {
S
Shengliang Guan 已提交
663
    //    qError("fseek failed, errno:%d", errno);
664 665 666
    return -1;
  }

667
  size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader));
S
Shengliang Guan 已提交
668 669 670
  if (ws != 1) {
    //    qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
    //    (int32_t)sizeof(STSBufFileHeader));
671 672 673 674 675 676 677 678 679
    return -1;
  }
  return 0;
}

bool tsBufNextPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
    return false;
  }
S
Shengliang Guan 已提交
680

681
  STSCursor* pCur = &pTSBuf->cur;
S
Shengliang Guan 已提交
682

683 684 685 686
  // get the first/last position according to traverse order
  if (pCur->vgroupIndex == -1) {
    if (pCur->order == TSDB_ORDER_ASC) {
      tsBufGetBlock(pTSBuf, 0, 0);
S
Shengliang Guan 已提交
687

688 689 690 691 692 693
      if (pTSBuf->block.numOfElem == 0) {  // the whole list is empty, return
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
S
Shengliang Guan 已提交
694

695 696
    } else {  // get the last timestamp record in the last block of the last vnode
      assert(pTSBuf->numOfGroups > 0);
S
Shengliang Guan 已提交
697

698 699
      int32_t groupIndex = pTSBuf->numOfGroups - 1;
      pCur->vgroupIndex = groupIndex;
S
Shengliang Guan 已提交
700

701 702 703
      int32_t            id = pTSBuf->pData[pCur->vgroupIndex].info.id;
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      int32_t            blockIndex = pBlockInfo->numOfBlocks - 1;
S
Shengliang Guan 已提交
704

705
      tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
S
Shengliang Guan 已提交
706

707 708 709 710 711 712 713 714 715
      pCur->tsIndex = pTSBuf->block.numOfElem - 1;
      if (pTSBuf->block.numOfElem == 0) {
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
    }
  }
S
Shengliang Guan 已提交
716

717
  int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1;
S
Shengliang Guan 已提交
718

719 720
  while (1) {
    assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
S
Shengliang Guan 已提交
721

722 723 724
    if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
        (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
      int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
S
Shengliang Guan 已提交
725

726 727 728 729 730 731 732 733
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
          (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
        if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
            (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
          pCur->vgroupIndex = -1;
          return false;
        }
S
Shengliang Guan 已提交
734

735 736 737
        if (pBlockInfo == NULL) {
          return false;
        }
S
Shengliang Guan 已提交
738

739 740 741
        int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
        break;
S
Shengliang Guan 已提交
742

743 744 745 746 747 748 749 750 751
      } else {
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
        break;
      }
    } else {
      pCur->tsIndex += step;
      break;
    }
  }
S
Shengliang Guan 已提交
752

753 754 755 756 757 758 759
  return true;
}

void tsBufResetPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
760

761 762 763 764 765 766 767 768
  pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
}

STSElem tsBufGetElem(STSBuf* pTSBuf) {
  STSElem elem1 = {.id = -1};
  if (pTSBuf == NULL) {
    return elem1;
  }
S
Shengliang Guan 已提交
769

770 771 772 773 774 775
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur != NULL && pCur->vgroupIndex < 0) {
    return elem1;
  }

  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
776

777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
  elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
  elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
  elem1.tag = &pBlock->tag;

  return elem1;
}

/**
 * current only support ts comp data from two vnode merge
 * @param pDestBuf
 * @param pSrcBuf
 * @param id
 * @return
 */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
  if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
    return 0;
  }
S
Shengliang Guan 已提交
795

796 797 798
  if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
    return -1;
  }
S
Shengliang Guan 已提交
799

800 801 802 803 804
  // src can only have one vnode index
  assert(pSrcBuf->numOfGroups == 1);

  // there are data in buffer, flush to disk first
  tsBufFlush(pDestBuf);
S
Shengliang Guan 已提交
805

806
  // compared with the last vnode id
S
Shengliang Guan 已提交
807
  int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id;
808 809 810
  if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
    int32_t oldSize = pDestBuf->numOfGroups;
    int32_t newSize = oldSize + pSrcBuf->numOfGroups;
S
Shengliang Guan 已提交
811

812 813
    if (pDestBuf->numOfAlloc < newSize) {
      pDestBuf->numOfAlloc = newSize;
S
Shengliang Guan 已提交
814

815 816 817 818
      STSGroupBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
      if (tmp == NULL) {
        return -1;
      }
S
Shengliang Guan 已提交
819

820 821
      pDestBuf->pData = tmp;
    }
S
Shengliang Guan 已提交
822

823 824
    // directly copy the vnode index information
    memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
S
Shengliang Guan 已提交
825

826 827 828 829 830 831
    // set the new offset value
    for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
      STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
      pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
      pBlockInfoEx->info.id = id;
    }
S
Shengliang Guan 已提交
832

833 834 835
    pDestBuf->numOfGroups = newSize;
  } else {
    STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
S
Shengliang Guan 已提交
836

837 838 839 840 841
    pBlockInfoEx->len += pSrcBuf->pData[0].len;
    pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
    pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
    pBlockInfoEx->info.id = id;
  }
S
Shengliang Guan 已提交
842

843
  int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END);
844
  assert(r == 0);
S
Shengliang Guan 已提交
845

846 847
  int64_t offset = getDataStartOffset();
  int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
848
  int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
S
Shengliang Guan 已提交
849

850 851 852
  if (written == -1 || written != size) {
    return -1;
  }
S
Shengliang Guan 已提交
853

854 855 856 857 858
  pDestBuf->numOfTotal += pSrcBuf->numOfTotal;

  int32_t oldSize = pDestBuf->fileSize;

  // file meta data may be cached, close and reopen the file for accurate file size.
859 860 861 862
  taosCloseFile(&pDestBuf->pFile);
  // pDestBuf->pFile = fopen(pDestBuf->path, "rb+");
  pDestBuf->pFile = taosOpenFile(pDestBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pDestBuf->pFile == NULL) {
863 864 865
    return -1;
  }

866 867
  int64_t file_size;
  if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) {
S
Shengliang Guan 已提交
868
    return -1;
869
  }
870
  pDestBuf->fileSize = (uint32_t)file_size;
871 872 873 874 875 876 877 878

  assert(pDestBuf->fileSize == oldSize + size);

  return 0;
}

STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
  STSBuf* pTSBuf = tsBufCreate(true, order);
S
Shengliang Guan 已提交
879

880 881 882 883 884
  STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
  pBlockInfo->numOfBlocks = numOfBlocks;
  pBlockInfo->compLen = len;
  pBlockInfo->offset = getDataStartOffset();
  pBlockInfo->id = id;
S
Shengliang Guan 已提交
885

886 887
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
S
Shengliang Guan 已提交
888

889
  int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET);
890
  if (ret == -1) {
S
Shengliang Guan 已提交
891
    //    qError("fseek failed, errno:%d", errno);
892 893 894
    tsBufDestroy(pTSBuf);
    return NULL;
  }
895
  size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len);
896
  if (sz != len) {
S
Shengliang Guan 已提交
897
    //    qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
898 899 900 901
    tsBufDestroy(pTSBuf);
    return NULL;
  }
  pTSBuf->fileSize += len;
S
Shengliang Guan 已提交
902

903 904
  pTSBuf->tsOrder = order;
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
S
Shengliang Guan 已提交
905

906 907 908 909 910 911 912 913
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }

  // TODO taosFsync??
S
Shengliang Guan 已提交
914 915 916 917 918 919
  //  if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
  ////    qError("fsync failed, errno:%d", errno);
  //    tsBufDestroy(pTSBuf);
  //    return NULL;
  //  }

920 921 922 923 924
  return pTSBuf;
}

STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) {
  STSElem elem = {.id = -1};
S
Shengliang Guan 已提交
925

926 927 928
  if (pTSBuf == NULL) {
    return elem;
  }
S
Shengliang Guan 已提交
929

930 931 932 933
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return elem;
  }
S
Shengliang Guan 已提交
934

935 936
  // for debug purpose
  //  tsBufDisplay(pTSBuf);
S
Shengliang Guan 已提交
937

938 939
  STSCursor*         pCur = &pTSBuf->cur;
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
S
Shengliang Guan 已提交
940

941 942 943 944
  int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
  if (blockIndex < 0) {
    return elem;
  }
S
Shengliang Guan 已提交
945

946 947 948
  pCur->vgroupIndex = j;
  pCur->blockIndex = blockIndex;
  tsBufGetBlock(pTSBuf, j, blockIndex);
S
Shengliang Guan 已提交
949

950 951 952 953 954 955 956 957
  return tsBufGetElem(pTSBuf);
}

STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
  STSCursor c = {.vgroupIndex = -1};
  if (pTSBuf == NULL) {
    return c;
  }
S
Shengliang Guan 已提交
958

959 960 961 962 963 964 965
  return pTSBuf->cur;
}

void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
  if (pTSBuf == NULL || pCur == NULL) {
    return;
  }
S
Shengliang Guan 已提交
966

967 968 969 970
  //  assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
  if (pCur->vgroupIndex != -1) {
    tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
  }
S
Shengliang Guan 已提交
971

972 973 974 975 976 977 978
  pTSBuf->cur = *pCur;
}

void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
979

980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
  pTSBuf->cur.order = order;
}

STSBuf* tsBufClone(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }

  tsBufFlush(pTSBuf);

  return tsBufCreateFromFile(pTSBuf->path, false);
}

void tsBufDisplay(STSBuf* pTSBuf) {
  printf("-------start of ts comp file-------\n");
  printf("number of vnode:%d\n", pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
996

997 998
  int32_t old = pTSBuf->cur.order;
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
999

1000
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
1001

1002 1003 1004
  while (tsBufNextPos(pTSBuf)) {
    STSElem elem = tsBufGetElem(pTSBuf);
    if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
1005
      printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts);
1006 1007
    }
  }
S
Shengliang Guan 已提交
1008

1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
  pTSBuf->cur.order = old;
  printf("-------end of ts comp file-------\n");
}

static int32_t getDataStartOffset() {
  return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
}

// update prev vnode length info in file
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
  int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
  doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
}

static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
  const int32_t INITIAL_GROUPINFO_SIZE = 4;
S
Shengliang Guan 已提交
1025

1026 1027 1028 1029 1030 1031
  pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
  pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
  if (pTSBuf->pData == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1032

1033 1034 1035 1036 1037
  pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
  if (pTSBuf->tsData.rawBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1038

1039 1040 1041
  pTSBuf->bufSize = MEM_BUF_SIZE;
  pTSBuf->tsData.threshold = MEM_BUF_SIZE;
  pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
S
Shengliang Guan 已提交
1042

1043 1044 1045 1046 1047
  pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
  if (pTSBuf->assistBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1048

1049 1050 1051 1052 1053
  pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
  if (pTSBuf->block.payload == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1054

1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
  pTSBuf->fileSize += getDataStartOffset();
  return pTSBuf;
}

int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return 0;
  }

  return pTSBuf->numOfGroups;
}

void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
  int32_t size = tsBufGetNumOfGroup(pTSBuf);
  if (num != NULL) {
    *num = size;
  }

  *id = NULL;
  if (size == 0) {
    return;
  }

  (*id) = malloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));

S
Shengliang Guan 已提交
1080
  for (int32_t i = 0; i < size; ++i) {
1081 1082 1083 1084 1085 1086
    (*id)[i] = pTSBuf->pData[i].info.id;
  }
}

int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
  assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1087
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
1088 1089 1090 1091

  *len = 0;
  *numOfBlocks = 0;

1092
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
1093
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1094
    //    qError("%p: fseek failed: %s", pSql, tstrerror(code));
1095 1096 1097
    return code;
  }

1098
  size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
1099
  if (s != pBlockInfo->compLen) {
1100
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1101
    //    tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
    return code;
  }

  *len = pBlockInfo->compLen;
  *numOfBlocks = pBlockInfo->numOfBlocks;

  return TSDB_CODE_SUCCESS;
}

STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
  STSElem el = {.id = -1};

  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
    if (el.id == pTSBuf->pData[i].info.id) {
      return el;
    }
  }

  return el;
}

S
Shengliang Guan 已提交
1124
bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }