ttszip.c 33.4 KB
Newer Older
S
common  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17 18 19 20 21
#include "ttszip.h"
#include "taoserror.h"
#include "tcompression.h"

static int32_t getDataStartOffset();
S
Shengliang Guan 已提交
22
static void    TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
23 24 25 26 27 28 29 30 31 32
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);

/**
 * todo error handling
 * support auto closeable tmp file
 * @param path
 * @return
 */
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
wafwerar's avatar
wafwerar 已提交
33 34
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_TSC_NO_DISKSPACE;
wafwerar's avatar
wafwerar 已提交
35
    // tscError("tmp file created failed since %s", terrstr());
wafwerar's avatar
wafwerar 已提交
36 37
    return NULL;
  }
G
Ganlin Zhao 已提交
38

wafwerar's avatar
wafwerar 已提交
39
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
40 41 42 43 44
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
45

S
os env  
Shengliang Guan 已提交
46
  taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
47
  // pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
48
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
49
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
50
    taosMemoryFree(pTSBuf);
51 52 53 54
    return NULL;
  }

  if (!autoDelete) {
G
Ganlin Zhao 已提交
55 56 57 58
    if (taosRemoveFile(pTSBuf->path) == NULL) {
      taosMemoryFree(pTSBuf);
      return NULL;
    }
59
  }
S
Shengliang Guan 已提交
60

G
Ganlin Zhao 已提交
61
  if (allocResForTSBuf(pTSBuf) == NULL) {
62 63
    return NULL;
  }
S
Shengliang Guan 已提交
64

65 66 67
  // update the header info
  STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
  STSBufUpdateHeader(pTSBuf, &header);
S
Shengliang Guan 已提交
68

69 70 71 72
  tsBufResetPos(pTSBuf);
  pTSBuf->cur.order = TSDB_ORDER_ASC;

  pTSBuf->tsOrder = order;
S
Shengliang Guan 已提交
73

74 75 76 77
  return pTSBuf;
}

STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
wafwerar's avatar
wafwerar 已提交
78
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
79 80 81 82 83
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
84

85
  tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
S
Shengliang Guan 已提交
86

87 88 89
  // pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
90
    taosMemoryFree(pTSBuf);
91 92
    return NULL;
  }
S
Shengliang Guan 已提交
93

94 95 96
  if (allocResForTSBuf(pTSBuf) == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
97

98 99
  // validate the file magic number
  STSBufFileHeader header = {0};
S
Shengliang Guan 已提交
100
  int32_t          ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
101
  UNUSED(ret);
102
  size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader));
103 104 105 106 107 108 109
  UNUSED(sz);

  // invalid file
  if (header.magic != TS_COMP_FILE_MAGIC) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
110

111 112
  if (header.numOfGroup > pTSBuf->numOfAlloc) {
    pTSBuf->numOfAlloc = header.numOfGroup;
wafwerar's avatar
wafwerar 已提交
113
    STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
114 115 116 117
    if (tmp == NULL) {
      tsBufDestroy(pTSBuf);
      return NULL;
    }
S
Shengliang Guan 已提交
118

119 120
    pTSBuf->pData = tmp;
  }
S
Shengliang Guan 已提交
121

122
  pTSBuf->numOfGroups = header.numOfGroup;
S
Shengliang Guan 已提交
123

124 125 126
  // check the ts order
  pTSBuf->tsOrder = header.tsOrder;
  if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
S
Shengliang Guan 已提交
127
    //    tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
128 129 130
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
131

132
  size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
S
Shengliang Guan 已提交
133

wafwerar's avatar
wafwerar 已提交
134
  STSGroupBlockInfo* buf = (STSGroupBlockInfo*)taosMemoryCalloc(1, infoSize);
135 136
  if (buf == NULL) {
    tsBufDestroy(pTSBuf);
S
Shengliang Guan 已提交
137 138 139 140
    return NULL;
  }

  // int64_t pos = ftell(pTSBuf->pFile); //pos not used
141
  sz = taosReadFile(pTSBuf->pFile, buf, infoSize);
142
  UNUSED(sz);
S
Shengliang Guan 已提交
143

144 145 146 147 148
  // the length value for each vnode is not kept in file, so does not set the length value
  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
    memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
  }
wafwerar's avatar
wafwerar 已提交
149
  taosMemoryFree(buf);
S
Shengliang Guan 已提交
150

151
  ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END);
152
  UNUSED(ret);
S
Shengliang Guan 已提交
153

154 155
  int64_t file_size;
  if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) {
156 157 158
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
159

160
  pTSBuf->fileSize = (uint32_t)file_size;
161
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
162

163 164
  // ascending by default
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
165 166 167 168 169

  //  tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
  //  fileno(pTSBuf->pFile),
  //           pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);

170 171 172 173 174 175 176
  return pTSBuf;
}

void* tsBufDestroy(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
177

wafwerar's avatar
wafwerar 已提交
178 179
  taosMemoryFreeClear(pTSBuf->assistBuf);
  taosMemoryFreeClear(pTSBuf->tsData.rawBuf);
S
Shengliang Guan 已提交
180

wafwerar's avatar
wafwerar 已提交
181 182
  taosMemoryFreeClear(pTSBuf->pData);
  taosMemoryFreeClear(pTSBuf->block.payload);
183 184

  if (!pTSBuf->remainOpen) {
185
    taosCloseFile(&pTSBuf->pFile);
186
  }
S
Shengliang Guan 已提交
187

188
  if (pTSBuf->autoDelete) {
S
Shengliang Guan 已提交
189
    //    ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
190
    taosRemoveFile(pTSBuf->path);
191
  } else {
S
Shengliang Guan 已提交
192
    //    tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
193 194 195
  }

  taosVariantDestroy(&pTSBuf->block.tag);
wafwerar's avatar
wafwerar 已提交
196
  taosMemoryFree(pTSBuf);
197 198 199 200 201
  return NULL;
}

static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
  int32_t last = pTSBuf->numOfGroups - 1;
S
Shengliang Guan 已提交
202

203 204 205 206 207 208 209 210
  assert(last >= 0);
  return &pTSBuf->pData[last];
}

static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
  if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
    uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
    assert((int32_t)newSize > pTSBuf->numOfAlloc);
S
Shengliang Guan 已提交
211

wafwerar's avatar
wafwerar 已提交
212
    STSGroupBlockInfoEx* tmp = (STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
213 214 215
    if (tmp == NULL) {
      return NULL;
    }
S
Shengliang Guan 已提交
216

217 218 219 220
    pTSBuf->pData = tmp;
    pTSBuf->numOfAlloc = newSize;
    memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
  }
S
Shengliang Guan 已提交
221

222 223
  if (pTSBuf->numOfGroups > 0) {
    STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
224

225 226 227
    // update prev vnode length info in file
    TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
  }
S
Shengliang Guan 已提交
228

229 230 231 232 233
  // set initial value for vnode block
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
  pBlockInfo->id = id;
  pBlockInfo->offset = pTSBuf->fileSize;
  assert(pBlockInfo->offset >= getDataStartOffset());
S
Shengliang Guan 已提交
234

235 236
  // update vnode info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
S
Shengliang Guan 已提交
237

238 239
  // add one vnode info
  pTSBuf->numOfGroups += 1;
S
Shengliang Guan 已提交
240

241 242 243
  // update the header info
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
S
Shengliang Guan 已提交
244

245 246 247 248 249 250 251
  STSBufUpdateHeader(pTSBuf, &header);
  return tsBufGetLastGroupInfo(pTSBuf);
}

static void shrinkBuffer(STSList* ptsData) {
  // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
  if (ptsData->allocSize >= ptsData->threshold * 2) {
wafwerar's avatar
wafwerar 已提交
252
    char* rawBuf = taosMemoryRealloc(ptsData->rawBuf, MEM_BUF_SIZE);
S
Shengliang Guan 已提交
253
    if (rawBuf) {
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
      ptsData->rawBuf = rawBuf;
      ptsData->allocSize = MEM_BUF_SIZE;
    }
  }
}

static int32_t getTagAreaLength(SVariant* pa) {
  int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType);
  if (pa->nType != TSDB_DATA_TYPE_NULL) {
    t += pa->nLen;
  }

  return t;
}

static void writeDataToDisk(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len == 0) {
    return;
  }
S
Shengliang Guan 已提交
273

274 275 276 277
  STSBlock* pBlock = &pTSBuf->block;
  STSList*  pTsData = &pTSBuf->tsData;

  pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
278 279 280
  pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload,
                                        pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);

281
  int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET);
282
  assert(r == 0);
S
Shengliang Guan 已提交
283

284 285 286 287 288 289 290 291
  /*
   * format for output data:
   * 1. tags, number of ts, size after compressed, payload, size after compressed
   * 2. tags, number of ts, size after compressed, payload, size after compressed
   *
   * both side has the compressed length is used to support load data forwards/backwords.
   */
  int32_t metaLen = 0;
292
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
293 294 295

  int32_t trueLen = pBlock->tag.nLen;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
296 297
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
298
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
299
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
300
    float tfloat = (float)pBlock->tag.d;
S
Shengliang Guan 已提交
301
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
302
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
303
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
304
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
305 306
  } else {
    trueLen = 0;
307
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
308 309
  }

310 311 312 313
  taosWriteFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
  taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
314

S
Shengliang Guan 已提交
315
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
316 317 318 319
  assert(metaLen == getTagAreaLength(&pBlock->tag));

  int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
  pTSBuf->fileSize += blockSize;
S
Shengliang Guan 已提交
320

321
  pTSBuf->tsData.len = 0;
S
Shengliang Guan 已提交
322

323
  STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
324

325 326
  pGroupBlockInfoEx->info.compLen += blockSize;
  pGroupBlockInfoEx->info.numOfBlocks += 1;
S
Shengliang Guan 已提交
327

328 329 330 331 332 333
  shrinkBuffer(&pTSBuf->tsData);
}

static void expandBuffer(STSList* ptsData, int32_t inputSize) {
  if (ptsData->allocSize - ptsData->len < inputSize) {
    int32_t newSize = inputSize + ptsData->len;
wafwerar's avatar
wafwerar 已提交
334
    char*   tmp = taosMemoryRealloc(ptsData->rawBuf, (size_t)newSize);
335 336 337
    if (tmp == NULL) {
      // todo
    }
S
Shengliang Guan 已提交
338

339 340 341 342 343 344 345 346 347
    ptsData->rawBuf = tmp;
    ptsData->allocSize = newSize;
  }
}

STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
  STSBlock* pBlock = &pTSBuf->block;

  // clear the memory buffer
S
Shengliang Guan 已提交
348 349
  pBlock->compLen = 0;
  pBlock->padding = 0;
350 351 352 353 354 355 356 357 358
  pBlock->numOfElem = 0;

  int32_t offset = -1;

  if (order == TSDB_ORDER_DESC) {
    /*
     * set the right position for the reversed traverse, the reversed traverse is started from
     * the end of each comp data block
     */
S
Shengliang Guan 已提交
359
    int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
360
    int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR);
S
Shengliang Guan 已提交
361
    size_t  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
362
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
363
    UNUSED(sz);
364 365 366 367

    pBlock->compLen = pBlock->padding;

    offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
368
    ret = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
369 370 371
    UNUSED(ret);
  }

372 373
  int32_t ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
  ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
374 375 376 377

  // NOTE: mix types tags are not supported
  size_t sz = 0;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
wafwerar's avatar
wafwerar 已提交
378
    char* tp = taosMemoryRealloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
379 380 381 382 383
    assert(tp != NULL);

    memset(tp, 0, pBlock->tag.nLen + 1);
    pBlock->tag.pz = tp;

384
    sz = taosReadFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
385 386 387
    UNUSED(sz);
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
    float tfloat = 0;
S
Shengliang Guan 已提交
388
    sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
389 390
    pBlock->tag.d = (double)tfloat;
    UNUSED(sz);
S
Shengliang Guan 已提交
391 392
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {  // TODO check the return value
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
393 394 395
    UNUSED(sz);
  }

396
  sz = taosReadFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
397
  UNUSED(sz);
398
  sz = taosReadFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
399
  UNUSED(sz);
400
  sz = taosReadFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
401 402 403 404 405 406

  if (decomp) {
    pTSBuf->tsData.len =
        tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                              pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
  }
S
Shengliang Guan 已提交
407

408
  // read the comp length at the length of comp block
409
  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
410 411 412
  assert(pBlock->padding == pBlock->compLen);

  int32_t n = 0;
413
  sz = taosReadFile(pTSBuf->pFile, &n, sizeof(pBlock->tag.nLen));
414 415 416 417 418 419 420
  if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) {
    assert(n == 0);
  } else {
    assert(n == pBlock->tag.nLen);
  }

  UNUSED(sz);
S
Shengliang Guan 已提交
421

422 423
  // for backwards traverse, set the start position at the end of previous block
  if (order == TSDB_ORDER_DESC) {
424
    int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
425 426
    UNUSED(r);
  }
S
Shengliang Guan 已提交
427

428 429 430 431 432 433
  return pBlock;
}

// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
  STSList* ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
434

435 436 437
  if (pTSBuf->tsOrder == -1) {
    if (ptsData->len > 0) {
      TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
S
Shengliang Guan 已提交
438

439 440 441 442 443 444 445 446 447
      if (lastKey > *(TSKEY*)pData) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      }
    } else if (len > TSDB_KEYSIZE) {
      // no data in current vnode, more than one ts is added, check the orders
      TSKEY k1 = *(TSKEY*)(pData);
      TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
S
Shengliang Guan 已提交
448

449 450 451 452 453 454 455 456 457 458 459
      if (k1 < k2) {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      } else if (k1 > k2) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        // todo handle error
      }
    }
  } else {
    // todo the timestamp order is set, check the asc/desc order of appended data
  }
S
Shengliang Guan 已提交
460

461 462 463 464 465 466
  return TSDB_CODE_SUCCESS;
}

void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) {
  STSGroupBlockInfoEx* pBlockInfo = NULL;
  STSList*             ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
467

468 469 470
  if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
S
Shengliang Guan 已提交
471

472 473 474 475
    pBlockInfo = addOneGroupInfo(pTSBuf, id);
  } else {
    pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
  }
S
Shengliang Guan 已提交
476

477 478 479 480 481 482 483 484 485 486 487
  assert(pBlockInfo->info.id == id);

  if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
    // new arrived data with different tags value, save current value into disk first
    writeDataToDisk(pTSBuf);
  } else {
    expandBuffer(ptsData, len);
  }

  taosVariantAssign(&pTSBuf->block.tag, tag);
  memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
S
Shengliang Guan 已提交
488

489 490
  // todo check return value
  setCheckTSOrder(pTSBuf, pData, len);
S
Shengliang Guan 已提交
491

492 493
  ptsData->len += len;
  pBlockInfo->len += len;
S
Shengliang Guan 已提交
494

495
  pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
496

497 498 499 500 501 502
  // the size of raw data exceeds the size of the default prepared buffer, so
  // during getBufBlock, the output buffer needs to be large enough.
  if (ptsData->len >= ptsData->threshold) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
  }
S
Shengliang Guan 已提交
503

504 505 506 507 508 509 510
  tsBufResetPos(pTSBuf);
}

void tsBufFlush(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len <= 0) {
    return;
  }
S
Shengliang Guan 已提交
511

512 513
  writeDataToDisk(pTSBuf);
  shrinkBuffer(&pTSBuf->tsData);
S
Shengliang Guan 已提交
514

515
  STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
516

517 518
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
S
Shengliang Guan 已提交
519

520 521 522 523 524 525 526 527 528 529 530 531 532 533
  // save the ts order into header
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  STSBufUpdateHeader(pTSBuf, &header);
}

static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
  int32_t j = -1;
  for (int32_t i = 0; i < numOfGroups; ++i) {
    if (pGroupInfoEx[i].info.id == id) {
      j = i;
      break;
    }
  }
S
Shengliang Guan 已提交
534

535 536 537 538 539
  return j;
}

// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
540
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
541 542
    return -1;
  }
S
Shengliang Guan 已提交
543

544 545 546
  // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
  int32_t i = 0;
  bool    decomp = false;
S
Shengliang Guan 已提交
547

548 549 550 551 552
  while ((i++) <= blockIndex) {
    if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) {
      return -1;
    }
  }
S
Shengliang Guan 已提交
553

554 555 556 557 558
  // set the file position to be the end of previous comp block
  if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
    STSBlock* pBlock = &pTSBuf->block;
    int32_t   compBlockSize =
        pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
559
    int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR);
560 561
    UNUSED(ret);
  }
S
Shengliang Guan 已提交
562

563 564 565 566 567
  return 0;
}

static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) {
  bool decomp = false;
S
Shengliang Guan 已提交
568

569 570 571 572 573 574
  int64_t offset = 0;
  if (pTSBuf->cur.order == TSDB_ORDER_ASC) {
    offset = pBlockInfo->offset;
  } else {  // reversed traverse starts from the end of block
    offset = pBlockInfo->offset + pBlockInfo->compLen;
  }
S
Shengliang Guan 已提交
575

576
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
577 578
    return -1;
  }
S
Shengliang Guan 已提交
579

580 581 582 583
  for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
    if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
      return -1;
    }
S
Shengliang Guan 已提交
584

585
    if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) {
S
Shengliang Guan 已提交
586
      return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1));
587 588
    }
  }
S
Shengliang Guan 已提交
589

590 591 592 593 594 595 596 597
  return -1;
}

static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
  if (pBlockInfo->numOfBlocks <= blockIndex) {
    assert(false);
  }
S
Shengliang Guan 已提交
598

599 600
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
S
Shengliang Guan 已提交
601
                                          (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
602 603 604
    int32_t i = 0;
    bool    decomp = false;
    int32_t step = abs(blockIndex - pCur->blockIndex);
S
Shengliang Guan 已提交
605

606 607 608 609 610 611 612 613 614 615
    while ((++i) <= step) {
      if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
        return;
      }
    }
  } else {
    if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
      assert(false);
    }
  }
S
Shengliang Guan 已提交
616

617
  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
618

619
  size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
S
Shengliang Guan 已提交
620

621 622 623 624 625 626 627
  /*
   * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
   * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
   */
  if (s > pTSBuf->tsData.allocSize) {
    expandBuffer(&pTSBuf->tsData, (int32_t)s);
  }
S
Shengliang Guan 已提交
628

629 630 631
  pTSBuf->tsData.len =
      tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                            pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
S
Shengliang Guan 已提交
632

633
  assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
S
Shengliang Guan 已提交
634

635 636
  pCur->vgroupIndex = groupIndex;
  pCur->blockIndex = blockIndex;
S
Shengliang Guan 已提交
637

638 639 640 641 642 643 644 645
  pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
}

static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
  if (offset < 0 || offset >= getDataStartOffset()) {
    return -1;
  }

646
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
647 648 649
    return -1;
  }

650
  taosWriteFile(pTSBuf->pFile, pVInfo, sizeof(STSGroupBlockInfo));
651 652 653 654 655 656 657 658
  return 0;
}

STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return NULL;
  }
S
Shengliang Guan 已提交
659

660 661 662 663
  return &pTSBuf->pData[j].info;
}

int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
664
  if ((pTSBuf->pFile == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
665 666 667 668 669
    return -1;
  }

  assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);

670
  int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
671
  if (r != 0) {
S
Shengliang Guan 已提交
672
    //    qError("fseek failed, errno:%d", errno);
673 674 675
    return -1;
  }

676
  size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader));
S
Shengliang Guan 已提交
677 678 679
  if (ws != 1) {
    //    qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
    //    (int32_t)sizeof(STSBufFileHeader));
680 681 682 683 684 685 686 687 688
    return -1;
  }
  return 0;
}

bool tsBufNextPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
    return false;
  }
S
Shengliang Guan 已提交
689

690
  STSCursor* pCur = &pTSBuf->cur;
S
Shengliang Guan 已提交
691

692 693 694 695
  // get the first/last position according to traverse order
  if (pCur->vgroupIndex == -1) {
    if (pCur->order == TSDB_ORDER_ASC) {
      tsBufGetBlock(pTSBuf, 0, 0);
S
Shengliang Guan 已提交
696

697 698 699 700 701 702
      if (pTSBuf->block.numOfElem == 0) {  // the whole list is empty, return
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
S
Shengliang Guan 已提交
703

704 705
    } else {  // get the last timestamp record in the last block of the last vnode
      assert(pTSBuf->numOfGroups > 0);
S
Shengliang Guan 已提交
706

707 708
      int32_t groupIndex = pTSBuf->numOfGroups - 1;
      pCur->vgroupIndex = groupIndex;
S
Shengliang Guan 已提交
709

710 711 712
      int32_t            id = pTSBuf->pData[pCur->vgroupIndex].info.id;
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      int32_t            blockIndex = pBlockInfo->numOfBlocks - 1;
S
Shengliang Guan 已提交
713

714
      tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
S
Shengliang Guan 已提交
715

716 717 718 719 720 721 722 723 724
      pCur->tsIndex = pTSBuf->block.numOfElem - 1;
      if (pTSBuf->block.numOfElem == 0) {
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
    }
  }
S
Shengliang Guan 已提交
725

726
  int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1;
S
Shengliang Guan 已提交
727

728 729
  while (1) {
    assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
S
Shengliang Guan 已提交
730

731 732 733
    if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
        (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
      int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
S
Shengliang Guan 已提交
734

735 736 737 738 739 740 741 742
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
          (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
        if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
            (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
          pCur->vgroupIndex = -1;
          return false;
        }
S
Shengliang Guan 已提交
743

744 745 746
        if (pBlockInfo == NULL) {
          return false;
        }
S
Shengliang Guan 已提交
747

748 749 750
        int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
        break;
S
Shengliang Guan 已提交
751

752 753 754 755 756 757 758 759 760
      } else {
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
        break;
      }
    } else {
      pCur->tsIndex += step;
      break;
    }
  }
S
Shengliang Guan 已提交
761

762 763 764 765 766 767 768
  return true;
}

void tsBufResetPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
769

770 771 772 773 774 775 776 777
  pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
}

STSElem tsBufGetElem(STSBuf* pTSBuf) {
  STSElem elem1 = {.id = -1};
  if (pTSBuf == NULL) {
    return elem1;
  }
S
Shengliang Guan 已提交
778

779 780 781 782 783 784
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur != NULL && pCur->vgroupIndex < 0) {
    return elem1;
  }

  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
785

786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
  elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
  elem1.tag = &pBlock->tag;

  return elem1;
}

/**
 * current only support ts comp data from two vnode merge
 * @param pDestBuf
 * @param pSrcBuf
 * @param id
 * @return
 */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
  if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
    return 0;
  }
S
Shengliang Guan 已提交
804

805 806 807
  if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
    return -1;
  }
S
Shengliang Guan 已提交
808

809 810 811 812 813
  // src can only have one vnode index
  assert(pSrcBuf->numOfGroups == 1);

  // there are data in buffer, flush to disk first
  tsBufFlush(pDestBuf);
S
Shengliang Guan 已提交
814

815
  // compared with the last vnode id
S
Shengliang Guan 已提交
816
  int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id;
817 818 819
  if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
    int32_t oldSize = pDestBuf->numOfGroups;
    int32_t newSize = oldSize + pSrcBuf->numOfGroups;
S
Shengliang Guan 已提交
820

821 822
    if (pDestBuf->numOfAlloc < newSize) {
      pDestBuf->numOfAlloc = newSize;
S
Shengliang Guan 已提交
823

wafwerar's avatar
wafwerar 已提交
824
      STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
825 826 827
      if (tmp == NULL) {
        return -1;
      }
S
Shengliang Guan 已提交
828

829 830
      pDestBuf->pData = tmp;
    }
S
Shengliang Guan 已提交
831

832 833
    // directly copy the vnode index information
    memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
S
Shengliang Guan 已提交
834

835 836 837 838 839 840
    // set the new offset value
    for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
      STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
      pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
      pBlockInfoEx->info.id = id;
    }
S
Shengliang Guan 已提交
841

842 843 844
    pDestBuf->numOfGroups = newSize;
  } else {
    STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
S
Shengliang Guan 已提交
845

846 847 848 849 850
    pBlockInfoEx->len += pSrcBuf->pData[0].len;
    pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
    pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
    pBlockInfoEx->info.id = id;
  }
S
Shengliang Guan 已提交
851

852
  int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END);
853
  assert(r == 0);
S
Shengliang Guan 已提交
854

855 856
  int64_t offset = getDataStartOffset();
  int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
857
  int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
S
Shengliang Guan 已提交
858

859 860 861
  if (written == -1 || written != size) {
    return -1;
  }
S
Shengliang Guan 已提交
862

863 864 865 866 867
  pDestBuf->numOfTotal += pSrcBuf->numOfTotal;

  int32_t oldSize = pDestBuf->fileSize;

  // file meta data may be cached, close and reopen the file for accurate file size.
868 869 870 871
  taosCloseFile(&pDestBuf->pFile);
  // pDestBuf->pFile = fopen(pDestBuf->path, "rb+");
  pDestBuf->pFile = taosOpenFile(pDestBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pDestBuf->pFile == NULL) {
872 873 874
    return -1;
  }

875 876
  int64_t file_size;
  if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) {
S
Shengliang Guan 已提交
877
    return -1;
878
  }
879
  pDestBuf->fileSize = (uint32_t)file_size;
880 881 882 883 884 885 886 887

  assert(pDestBuf->fileSize == oldSize + size);

  return 0;
}

STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
  STSBuf* pTSBuf = tsBufCreate(true, order);
S
Shengliang Guan 已提交
888

889 890 891 892 893
  STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
  pBlockInfo->numOfBlocks = numOfBlocks;
  pBlockInfo->compLen = len;
  pBlockInfo->offset = getDataStartOffset();
  pBlockInfo->id = id;
S
Shengliang Guan 已提交
894

895 896
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
S
Shengliang Guan 已提交
897

898
  int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET);
899
  if (ret == -1) {
S
Shengliang Guan 已提交
900
    //    qError("fseek failed, errno:%d", errno);
901 902 903
    tsBufDestroy(pTSBuf);
    return NULL;
  }
904
  size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len);
905
  if (sz != len) {
S
Shengliang Guan 已提交
906
    //    qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
907 908 909 910
    tsBufDestroy(pTSBuf);
    return NULL;
  }
  pTSBuf->fileSize += len;
S
Shengliang Guan 已提交
911

912 913
  pTSBuf->tsOrder = order;
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
S
Shengliang Guan 已提交
914

915 916 917 918 919 920 921 922
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }

  // TODO taosFsync??
S
Shengliang Guan 已提交
923 924 925 926 927 928
  //  if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
  ////    qError("fsync failed, errno:%d", errno);
  //    tsBufDestroy(pTSBuf);
  //    return NULL;
  //  }

929 930 931 932 933
  return pTSBuf;
}

STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) {
  STSElem elem = {.id = -1};
S
Shengliang Guan 已提交
934

935 936 937
  if (pTSBuf == NULL) {
    return elem;
  }
S
Shengliang Guan 已提交
938

939 940 941 942
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return elem;
  }
S
Shengliang Guan 已提交
943

944 945
  // for debug purpose
  //  tsBufDisplay(pTSBuf);
S
Shengliang Guan 已提交
946

947 948
  STSCursor*         pCur = &pTSBuf->cur;
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
S
Shengliang Guan 已提交
949

950 951 952 953
  int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
  if (blockIndex < 0) {
    return elem;
  }
S
Shengliang Guan 已提交
954

955 956 957
  pCur->vgroupIndex = j;
  pCur->blockIndex = blockIndex;
  tsBufGetBlock(pTSBuf, j, blockIndex);
S
Shengliang Guan 已提交
958

959 960 961 962 963 964 965 966
  return tsBufGetElem(pTSBuf);
}

STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
  STSCursor c = {.vgroupIndex = -1};
  if (pTSBuf == NULL) {
    return c;
  }
S
Shengliang Guan 已提交
967

968 969 970 971 972 973 974
  return pTSBuf->cur;
}

void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
  if (pTSBuf == NULL || pCur == NULL) {
    return;
  }
S
Shengliang Guan 已提交
975

976 977 978 979
  //  assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
  if (pCur->vgroupIndex != -1) {
    tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
  }
S
Shengliang Guan 已提交
980

981 982 983 984 985 986 987
  pTSBuf->cur = *pCur;
}

void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
988

989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
  pTSBuf->cur.order = order;
}

STSBuf* tsBufClone(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }

  tsBufFlush(pTSBuf);

  return tsBufCreateFromFile(pTSBuf->path, false);
}

void tsBufDisplay(STSBuf* pTSBuf) {
  printf("-------start of ts comp file-------\n");
  printf("number of vnode:%d\n", pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1005

1006 1007
  int32_t old = pTSBuf->cur.order;
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
1008

1009
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
1010

1011 1012 1013
  while (tsBufNextPos(pTSBuf)) {
    STSElem elem = tsBufGetElem(pTSBuf);
    if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
1014
      printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts);
1015 1016
    }
  }
S
Shengliang Guan 已提交
1017

1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
  pTSBuf->cur.order = old;
  printf("-------end of ts comp file-------\n");
}

static int32_t getDataStartOffset() {
  return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
}

// update prev vnode length info in file
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
  int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
  doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
}

static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
  const int32_t INITIAL_GROUPINFO_SIZE = 4;
S
Shengliang Guan 已提交
1034

1035
  pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
wafwerar's avatar
wafwerar 已提交
1036
  pTSBuf->pData = taosMemoryCalloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
1037 1038 1039 1040
  if (pTSBuf->pData == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1041

wafwerar's avatar
wafwerar 已提交
1042
  pTSBuf->tsData.rawBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1043 1044 1045 1046
  if (pTSBuf->tsData.rawBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1047

1048 1049 1050
  pTSBuf->bufSize = MEM_BUF_SIZE;
  pTSBuf->tsData.threshold = MEM_BUF_SIZE;
  pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
S
Shengliang Guan 已提交
1051

wafwerar's avatar
wafwerar 已提交
1052
  pTSBuf->assistBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1053 1054 1055 1056
  if (pTSBuf->assistBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1057

wafwerar's avatar
wafwerar 已提交
1058
  pTSBuf->block.payload = taosMemoryMalloc(MEM_BUF_SIZE);
1059 1060 1061 1062
  if (pTSBuf->block.payload == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1063

1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
  pTSBuf->fileSize += getDataStartOffset();
  return pTSBuf;
}

int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return 0;
  }

  return pTSBuf->numOfGroups;
}

void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
  int32_t size = tsBufGetNumOfGroup(pTSBuf);
  if (num != NULL) {
    *num = size;
  }

  *id = NULL;
  if (size == 0) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1087
  (*id) = taosMemoryMalloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
1088

S
Shengliang Guan 已提交
1089
  for (int32_t i = 0; i < size; ++i) {
1090 1091 1092 1093 1094 1095
    (*id)[i] = pTSBuf->pData[i].info.id;
  }
}

int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
  assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1096
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
1097 1098 1099 1100

  *len = 0;
  *numOfBlocks = 0;

1101
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
1102
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1103
    //    qError("%p: fseek failed: %s", pSql, tstrerror(code));
1104 1105 1106
    return code;
  }

1107
  size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
1108
  if (s != pBlockInfo->compLen) {
1109
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1110
    //    tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
    return code;
  }

  *len = pBlockInfo->compLen;
  *numOfBlocks = pBlockInfo->numOfBlocks;

  return TSDB_CODE_SUCCESS;
}

STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
  STSElem el = {.id = -1};

  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
    if (el.id == pTSBuf->pData[i].info.id) {
      return el;
    }
  }

  return el;
}

S
Shengliang Guan 已提交
1133
bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }