ttszip.c 33.5 KB
Newer Older
S
common  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17 18 19 20 21
#include "ttszip.h"
#include "taoserror.h"
#include "tcompression.h"

static int32_t getDataStartOffset();
S
Shengliang Guan 已提交
22
static void    TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
23 24 25 26 27 28 29 30 31 32
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);

/**
 * todo error handling
 * support auto closeable tmp file
 * @param path
 * @return
 */
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
wafwerar's avatar
wafwerar 已提交
33 34
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_TSC_NO_DISKSPACE;
wafwerar's avatar
wafwerar 已提交
35
    // tscError("tmp file created failed since %s", terrstr());
wafwerar's avatar
wafwerar 已提交
36 37
    return NULL;
  }
G
Ganlin Zhao 已提交
38

wafwerar's avatar
wafwerar 已提交
39
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
40 41 42 43 44
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
45

S
os env  
Shengliang Guan 已提交
46
  taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
47
  // pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
48
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
49
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
50
    taosMemoryFree(pTSBuf);
51 52 53 54
    return NULL;
  }

  if (!autoDelete) {
G
Ganlin Zhao 已提交
55
    if (taosRemoveFile(pTSBuf->path) != 0) {
G
Ganlin Zhao 已提交
56 57 58
      taosMemoryFree(pTSBuf);
      return NULL;
    }
59
  }
S
Shengliang Guan 已提交
60

G
Ganlin Zhao 已提交
61
  if (allocResForTSBuf(pTSBuf) == NULL) {
62 63
    return NULL;
  }
S
Shengliang Guan 已提交
64

65 66 67
  // update the header info
  STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
  STSBufUpdateHeader(pTSBuf, &header);
S
Shengliang Guan 已提交
68

69 70 71 72
  tsBufResetPos(pTSBuf);
  pTSBuf->cur.order = TSDB_ORDER_ASC;

  pTSBuf->tsOrder = order;
S
Shengliang Guan 已提交
73

74 75 76 77
  return pTSBuf;
}

STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
wafwerar's avatar
wafwerar 已提交
78
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
79 80 81 82 83
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
84

85
  tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
S
Shengliang Guan 已提交
86

87 88 89
  // pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
90
    taosMemoryFree(pTSBuf);
91 92
    return NULL;
  }
S
Shengliang Guan 已提交
93

94 95 96
  if (allocResForTSBuf(pTSBuf) == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
97

98 99
  // validate the file magic number
  STSBufFileHeader header = {0};
S
Shengliang Guan 已提交
100
  int32_t          ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
101
  UNUSED(ret);
102
  size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader));
103 104 105 106 107 108 109
  UNUSED(sz);

  // invalid file
  if (header.magic != TS_COMP_FILE_MAGIC) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
110

111 112
  if (header.numOfGroup > pTSBuf->numOfAlloc) {
    pTSBuf->numOfAlloc = header.numOfGroup;
wafwerar's avatar
wafwerar 已提交
113
    STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
114 115 116 117
    if (tmp == NULL) {
      tsBufDestroy(pTSBuf);
      return NULL;
    }
S
Shengliang Guan 已提交
118

119 120
    pTSBuf->pData = tmp;
  }
S
Shengliang Guan 已提交
121

122
  pTSBuf->numOfGroups = header.numOfGroup;
S
Shengliang Guan 已提交
123

124 125 126
  // check the ts order
  pTSBuf->tsOrder = header.tsOrder;
  if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
S
Shengliang Guan 已提交
127
    //    tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
128 129 130
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
131

132
  size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
S
Shengliang Guan 已提交
133

wafwerar's avatar
wafwerar 已提交
134
  STSGroupBlockInfo* buf = (STSGroupBlockInfo*)taosMemoryCalloc(1, infoSize);
135 136
  if (buf == NULL) {
    tsBufDestroy(pTSBuf);
S
Shengliang Guan 已提交
137 138 139 140
    return NULL;
  }

  // int64_t pos = ftell(pTSBuf->pFile); //pos not used
141
  sz = taosReadFile(pTSBuf->pFile, buf, infoSize);
142
  UNUSED(sz);
S
Shengliang Guan 已提交
143

144 145 146 147 148
  // the length value for each vnode is not kept in file, so does not set the length value
  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
    memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
  }
wafwerar's avatar
wafwerar 已提交
149
  taosMemoryFree(buf);
S
Shengliang Guan 已提交
150

151
  ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END);
152
  UNUSED(ret);
S
Shengliang Guan 已提交
153

154 155
  int64_t file_size;
  if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) {
156 157 158
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
159

160
  pTSBuf->fileSize = (uint32_t)file_size;
161
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
162

163 164
  // ascending by default
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
165 166 167 168 169

  //  tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
  //  fileno(pTSBuf->pFile),
  //           pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);

170 171 172 173 174 175 176
  return pTSBuf;
}

void* tsBufDestroy(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
177

wafwerar's avatar
wafwerar 已提交
178 179
  taosMemoryFreeClear(pTSBuf->assistBuf);
  taosMemoryFreeClear(pTSBuf->tsData.rawBuf);
S
Shengliang Guan 已提交
180

wafwerar's avatar
wafwerar 已提交
181 182
  taosMemoryFreeClear(pTSBuf->pData);
  taosMemoryFreeClear(pTSBuf->block.payload);
183 184

  if (!pTSBuf->remainOpen) {
185
    taosCloseFile(&pTSBuf->pFile);
186
  }
S
Shengliang Guan 已提交
187

188
  if (pTSBuf->autoDelete) {
S
Shengliang Guan 已提交
189
    //    ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
G
Ganlin Zhao 已提交
190 191 192
    if (taosRemoveFile(pTSBuf->path) != 0) {
      // tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path);
    }
193
  } else {
S
Shengliang Guan 已提交
194
    //    tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
195 196 197
  }

  taosVariantDestroy(&pTSBuf->block.tag);
wafwerar's avatar
wafwerar 已提交
198
  taosMemoryFree(pTSBuf);
199 200 201 202 203
  return NULL;
}

static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
  int32_t last = pTSBuf->numOfGroups - 1;
S
Shengliang Guan 已提交
204

205 206 207 208 209 210 211 212
  assert(last >= 0);
  return &pTSBuf->pData[last];
}

static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
  if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
    uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
    assert((int32_t)newSize > pTSBuf->numOfAlloc);
S
Shengliang Guan 已提交
213

H
Hongze Cheng 已提交
214 215
    STSGroupBlockInfoEx* tmp =
        (STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
216 217 218
    if (tmp == NULL) {
      return NULL;
    }
S
Shengliang Guan 已提交
219

220 221 222 223
    pTSBuf->pData = tmp;
    pTSBuf->numOfAlloc = newSize;
    memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
  }
S
Shengliang Guan 已提交
224

225 226
  if (pTSBuf->numOfGroups > 0) {
    STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
227

228 229 230
    // update prev vnode length info in file
    TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
  }
S
Shengliang Guan 已提交
231

232 233 234 235 236
  // set initial value for vnode block
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
  pBlockInfo->id = id;
  pBlockInfo->offset = pTSBuf->fileSize;
  assert(pBlockInfo->offset >= getDataStartOffset());
S
Shengliang Guan 已提交
237

238 239
  // update vnode info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
S
Shengliang Guan 已提交
240

241 242
  // add one vnode info
  pTSBuf->numOfGroups += 1;
S
Shengliang Guan 已提交
243

244 245 246
  // update the header info
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
S
Shengliang Guan 已提交
247

248 249 250 251 252 253 254
  STSBufUpdateHeader(pTSBuf, &header);
  return tsBufGetLastGroupInfo(pTSBuf);
}

static void shrinkBuffer(STSList* ptsData) {
  // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
  if (ptsData->allocSize >= ptsData->threshold * 2) {
wafwerar's avatar
wafwerar 已提交
255
    char* rawBuf = taosMemoryRealloc(ptsData->rawBuf, MEM_BUF_SIZE);
S
Shengliang Guan 已提交
256
    if (rawBuf) {
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
      ptsData->rawBuf = rawBuf;
      ptsData->allocSize = MEM_BUF_SIZE;
    }
  }
}

static int32_t getTagAreaLength(SVariant* pa) {
  int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType);
  if (pa->nType != TSDB_DATA_TYPE_NULL) {
    t += pa->nLen;
  }

  return t;
}

static void writeDataToDisk(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len == 0) {
    return;
  }
S
Shengliang Guan 已提交
276

277 278 279 280
  STSBlock* pBlock = &pTSBuf->block;
  STSList*  pTsData = &pTSBuf->tsData;

  pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
281 282 283
  pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload,
                                        pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);

284
  int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET);
285
  assert(r == 0);
S
Shengliang Guan 已提交
286

287 288 289 290 291 292 293 294
  /*
   * format for output data:
   * 1. tags, number of ts, size after compressed, payload, size after compressed
   * 2. tags, number of ts, size after compressed, payload, size after compressed
   *
   * both side has the compressed length is used to support load data forwards/backwords.
   */
  int32_t metaLen = 0;
295
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
296 297 298

  int32_t trueLen = pBlock->tag.nLen;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
299 300
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
301
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
302
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
303
    float tfloat = (float)pBlock->tag.d;
S
Shengliang Guan 已提交
304
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
305
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
306
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
307
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
308 309
  } else {
    trueLen = 0;
310
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
311 312
  }

313 314 315 316
  taosWriteFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
  taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
317

S
Shengliang Guan 已提交
318
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
319 320 321 322
  assert(metaLen == getTagAreaLength(&pBlock->tag));

  int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
  pTSBuf->fileSize += blockSize;
S
Shengliang Guan 已提交
323

324
  pTSBuf->tsData.len = 0;
S
Shengliang Guan 已提交
325

326
  STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
327

328 329
  pGroupBlockInfoEx->info.compLen += blockSize;
  pGroupBlockInfoEx->info.numOfBlocks += 1;
S
Shengliang Guan 已提交
330

331 332 333 334 335 336
  shrinkBuffer(&pTSBuf->tsData);
}

static void expandBuffer(STSList* ptsData, int32_t inputSize) {
  if (ptsData->allocSize - ptsData->len < inputSize) {
    int32_t newSize = inputSize + ptsData->len;
wafwerar's avatar
wafwerar 已提交
337
    char*   tmp = taosMemoryRealloc(ptsData->rawBuf, (size_t)newSize);
338 339 340
    if (tmp == NULL) {
      // todo
    }
S
Shengliang Guan 已提交
341

342 343 344 345 346 347 348 349 350
    ptsData->rawBuf = tmp;
    ptsData->allocSize = newSize;
  }
}

STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
  STSBlock* pBlock = &pTSBuf->block;

  // clear the memory buffer
S
Shengliang Guan 已提交
351 352
  pBlock->compLen = 0;
  pBlock->padding = 0;
353 354 355 356 357 358 359 360 361
  pBlock->numOfElem = 0;

  int32_t offset = -1;

  if (order == TSDB_ORDER_DESC) {
    /*
     * set the right position for the reversed traverse, the reversed traverse is started from
     * the end of each comp data block
     */
S
Shengliang Guan 已提交
362
    int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
363
    int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR);
S
Shengliang Guan 已提交
364
    size_t  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
365
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
366
    UNUSED(sz);
367 368 369 370

    pBlock->compLen = pBlock->padding;

    offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
371
    ret = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
372 373 374
    UNUSED(ret);
  }

375 376
  int32_t ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
  ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
377 378 379 380

  // NOTE: mix types tags are not supported
  size_t sz = 0;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
wafwerar's avatar
wafwerar 已提交
381
    char* tp = taosMemoryRealloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
382 383 384 385 386
    assert(tp != NULL);

    memset(tp, 0, pBlock->tag.nLen + 1);
    pBlock->tag.pz = tp;

387
    sz = taosReadFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
388 389 390
    UNUSED(sz);
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
    float tfloat = 0;
S
Shengliang Guan 已提交
391
    sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
392 393
    pBlock->tag.d = (double)tfloat;
    UNUSED(sz);
S
Shengliang Guan 已提交
394 395
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {  // TODO check the return value
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
396 397 398
    UNUSED(sz);
  }

399
  sz = taosReadFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
400
  UNUSED(sz);
401
  sz = taosReadFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
402
  UNUSED(sz);
403
  sz = taosReadFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
404 405 406 407 408 409

  if (decomp) {
    pTSBuf->tsData.len =
        tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                              pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
  }
S
Shengliang Guan 已提交
410

411
  // read the comp length at the length of comp block
412
  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
413 414 415
  assert(pBlock->padding == pBlock->compLen);

  int32_t n = 0;
416
  sz = taosReadFile(pTSBuf->pFile, &n, sizeof(pBlock->tag.nLen));
417 418 419 420 421 422 423
  if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) {
    assert(n == 0);
  } else {
    assert(n == pBlock->tag.nLen);
  }

  UNUSED(sz);
S
Shengliang Guan 已提交
424

425 426
  // for backwards traverse, set the start position at the end of previous block
  if (order == TSDB_ORDER_DESC) {
427
    int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
428 429
    UNUSED(r);
  }
S
Shengliang Guan 已提交
430

431 432 433 434 435 436
  return pBlock;
}

// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
  STSList* ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
437

438 439 440
  if (pTSBuf->tsOrder == -1) {
    if (ptsData->len > 0) {
      TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
S
Shengliang Guan 已提交
441

442 443 444 445 446 447 448 449 450
      if (lastKey > *(TSKEY*)pData) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      }
    } else if (len > TSDB_KEYSIZE) {
      // no data in current vnode, more than one ts is added, check the orders
      TSKEY k1 = *(TSKEY*)(pData);
      TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
S
Shengliang Guan 已提交
451

452 453 454 455 456 457 458 459 460 461 462
      if (k1 < k2) {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      } else if (k1 > k2) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        // todo handle error
      }
    }
  } else {
    // todo the timestamp order is set, check the asc/desc order of appended data
  }
S
Shengliang Guan 已提交
463

464 465 466 467 468 469
  return TSDB_CODE_SUCCESS;
}

void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) {
  STSGroupBlockInfoEx* pBlockInfo = NULL;
  STSList*             ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
470

471 472 473
  if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
S
Shengliang Guan 已提交
474

475 476 477 478
    pBlockInfo = addOneGroupInfo(pTSBuf, id);
  } else {
    pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
  }
S
Shengliang Guan 已提交
479

480 481 482 483 484 485 486 487 488 489 490
  assert(pBlockInfo->info.id == id);

  if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
    // new arrived data with different tags value, save current value into disk first
    writeDataToDisk(pTSBuf);
  } else {
    expandBuffer(ptsData, len);
  }

  taosVariantAssign(&pTSBuf->block.tag, tag);
  memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
S
Shengliang Guan 已提交
491

492 493
  // todo check return value
  setCheckTSOrder(pTSBuf, pData, len);
S
Shengliang Guan 已提交
494

495 496
  ptsData->len += len;
  pBlockInfo->len += len;
S
Shengliang Guan 已提交
497

498
  pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
499

500 501 502 503 504 505
  // the size of raw data exceeds the size of the default prepared buffer, so
  // during getBufBlock, the output buffer needs to be large enough.
  if (ptsData->len >= ptsData->threshold) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
  }
S
Shengliang Guan 已提交
506

507 508 509 510 511 512 513
  tsBufResetPos(pTSBuf);
}

void tsBufFlush(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len <= 0) {
    return;
  }
S
Shengliang Guan 已提交
514

515 516
  writeDataToDisk(pTSBuf);
  shrinkBuffer(&pTSBuf->tsData);
S
Shengliang Guan 已提交
517

518
  STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
519

520 521
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
S
Shengliang Guan 已提交
522

523 524 525 526 527 528 529 530 531 532 533 534 535 536
  // save the ts order into header
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  STSBufUpdateHeader(pTSBuf, &header);
}

static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
  int32_t j = -1;
  for (int32_t i = 0; i < numOfGroups; ++i) {
    if (pGroupInfoEx[i].info.id == id) {
      j = i;
      break;
    }
  }
S
Shengliang Guan 已提交
537

538 539 540 541 542
  return j;
}

// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
543
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
544 545
    return -1;
  }
S
Shengliang Guan 已提交
546

547 548 549
  // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
  int32_t i = 0;
  bool    decomp = false;
S
Shengliang Guan 已提交
550

551 552 553 554 555
  while ((i++) <= blockIndex) {
    if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) {
      return -1;
    }
  }
S
Shengliang Guan 已提交
556

557 558 559 560 561
  // set the file position to be the end of previous comp block
  if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
    STSBlock* pBlock = &pTSBuf->block;
    int32_t   compBlockSize =
        pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
562
    int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR);
563 564
    UNUSED(ret);
  }
S
Shengliang Guan 已提交
565

566 567 568 569 570
  return 0;
}

static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) {
  bool decomp = false;
S
Shengliang Guan 已提交
571

572 573 574 575 576 577
  int64_t offset = 0;
  if (pTSBuf->cur.order == TSDB_ORDER_ASC) {
    offset = pBlockInfo->offset;
  } else {  // reversed traverse starts from the end of block
    offset = pBlockInfo->offset + pBlockInfo->compLen;
  }
S
Shengliang Guan 已提交
578

579
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
580 581
    return -1;
  }
S
Shengliang Guan 已提交
582

583 584 585 586
  for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
    if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
      return -1;
    }
S
Shengliang Guan 已提交
587

588
    if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) {
S
Shengliang Guan 已提交
589
      return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1));
590 591
    }
  }
S
Shengliang Guan 已提交
592

593 594 595 596 597 598 599 600
  return -1;
}

static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
  if (pBlockInfo->numOfBlocks <= blockIndex) {
    assert(false);
  }
S
Shengliang Guan 已提交
601

602 603
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
S
Shengliang Guan 已提交
604
                                          (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
605 606 607
    int32_t i = 0;
    bool    decomp = false;
    int32_t step = abs(blockIndex - pCur->blockIndex);
S
Shengliang Guan 已提交
608

609 610 611 612 613 614 615 616 617 618
    while ((++i) <= step) {
      if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
        return;
      }
    }
  } else {
    if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
      assert(false);
    }
  }
S
Shengliang Guan 已提交
619

620
  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
621

622
  size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
S
Shengliang Guan 已提交
623

624 625 626 627 628 629 630
  /*
   * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
   * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
   */
  if (s > pTSBuf->tsData.allocSize) {
    expandBuffer(&pTSBuf->tsData, (int32_t)s);
  }
S
Shengliang Guan 已提交
631

632 633 634
  pTSBuf->tsData.len =
      tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                            pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
S
Shengliang Guan 已提交
635

636
  assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
S
Shengliang Guan 已提交
637

638 639
  pCur->vgroupIndex = groupIndex;
  pCur->blockIndex = blockIndex;
S
Shengliang Guan 已提交
640

641 642 643 644 645 646 647 648
  pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
}

static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
  if (offset < 0 || offset >= getDataStartOffset()) {
    return -1;
  }

649
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
650 651 652
    return -1;
  }

653
  taosWriteFile(pTSBuf->pFile, pVInfo, sizeof(STSGroupBlockInfo));
654 655 656 657 658 659 660 661
  return 0;
}

STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return NULL;
  }
S
Shengliang Guan 已提交
662

663 664 665 666
  return &pTSBuf->pData[j].info;
}

int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
667
  if ((pTSBuf->pFile == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
668 669 670 671 672
    return -1;
  }

  assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);

673
  int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
674
  if (r != 0) {
S
Shengliang Guan 已提交
675
    //    qError("fseek failed, errno:%d", errno);
676 677 678
    return -1;
  }

679
  size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader));
S
Shengliang Guan 已提交
680 681 682
  if (ws != 1) {
    //    qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
    //    (int32_t)sizeof(STSBufFileHeader));
683 684 685 686 687 688 689 690 691
    return -1;
  }
  return 0;
}

bool tsBufNextPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
    return false;
  }
S
Shengliang Guan 已提交
692

693
  STSCursor* pCur = &pTSBuf->cur;
S
Shengliang Guan 已提交
694

695 696 697 698
  // get the first/last position according to traverse order
  if (pCur->vgroupIndex == -1) {
    if (pCur->order == TSDB_ORDER_ASC) {
      tsBufGetBlock(pTSBuf, 0, 0);
S
Shengliang Guan 已提交
699

700 701 702 703 704 705
      if (pTSBuf->block.numOfElem == 0) {  // the whole list is empty, return
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
S
Shengliang Guan 已提交
706

707 708
    } else {  // get the last timestamp record in the last block of the last vnode
      assert(pTSBuf->numOfGroups > 0);
S
Shengliang Guan 已提交
709

710 711
      int32_t groupIndex = pTSBuf->numOfGroups - 1;
      pCur->vgroupIndex = groupIndex;
S
Shengliang Guan 已提交
712

713 714 715
      int32_t            id = pTSBuf->pData[pCur->vgroupIndex].info.id;
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      int32_t            blockIndex = pBlockInfo->numOfBlocks - 1;
S
Shengliang Guan 已提交
716

717
      tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
S
Shengliang Guan 已提交
718

719 720 721 722 723 724 725 726 727
      pCur->tsIndex = pTSBuf->block.numOfElem - 1;
      if (pTSBuf->block.numOfElem == 0) {
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
    }
  }
S
Shengliang Guan 已提交
728

729
  int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1;
S
Shengliang Guan 已提交
730

731 732
  while (1) {
    assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
S
Shengliang Guan 已提交
733

734 735 736
    if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
        (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
      int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
S
Shengliang Guan 已提交
737

738 739 740 741 742 743 744 745
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
          (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
        if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
            (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
          pCur->vgroupIndex = -1;
          return false;
        }
S
Shengliang Guan 已提交
746

747 748 749
        if (pBlockInfo == NULL) {
          return false;
        }
S
Shengliang Guan 已提交
750

751 752 753
        int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
        break;
S
Shengliang Guan 已提交
754

755 756 757 758 759 760 761 762 763
      } else {
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
        break;
      }
    } else {
      pCur->tsIndex += step;
      break;
    }
  }
S
Shengliang Guan 已提交
764

765 766 767 768 769 770 771
  return true;
}

void tsBufResetPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
772

773 774 775 776 777 778 779 780
  pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
}

STSElem tsBufGetElem(STSBuf* pTSBuf) {
  STSElem elem1 = {.id = -1};
  if (pTSBuf == NULL) {
    return elem1;
  }
S
Shengliang Guan 已提交
781

782 783 784 785 786 787
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur != NULL && pCur->vgroupIndex < 0) {
    return elem1;
  }

  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
788

789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806
  elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
  elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
  elem1.tag = &pBlock->tag;

  return elem1;
}

/**
 * current only support ts comp data from two vnode merge
 * @param pDestBuf
 * @param pSrcBuf
 * @param id
 * @return
 */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
  if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
    return 0;
  }
S
Shengliang Guan 已提交
807

808 809 810
  if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
    return -1;
  }
S
Shengliang Guan 已提交
811

812 813 814 815 816
  // src can only have one vnode index
  assert(pSrcBuf->numOfGroups == 1);

  // there are data in buffer, flush to disk first
  tsBufFlush(pDestBuf);
S
Shengliang Guan 已提交
817

818
  // compared with the last vnode id
S
Shengliang Guan 已提交
819
  int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id;
820 821 822
  if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
    int32_t oldSize = pDestBuf->numOfGroups;
    int32_t newSize = oldSize + pSrcBuf->numOfGroups;
S
Shengliang Guan 已提交
823

824 825
    if (pDestBuf->numOfAlloc < newSize) {
      pDestBuf->numOfAlloc = newSize;
S
Shengliang Guan 已提交
826

wafwerar's avatar
wafwerar 已提交
827
      STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
828 829 830
      if (tmp == NULL) {
        return -1;
      }
S
Shengliang Guan 已提交
831

832 833
      pDestBuf->pData = tmp;
    }
S
Shengliang Guan 已提交
834

835 836
    // directly copy the vnode index information
    memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
S
Shengliang Guan 已提交
837

838 839 840 841 842 843
    // set the new offset value
    for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
      STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
      pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
      pBlockInfoEx->info.id = id;
    }
S
Shengliang Guan 已提交
844

845 846 847
    pDestBuf->numOfGroups = newSize;
  } else {
    STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
S
Shengliang Guan 已提交
848

849 850 851 852 853
    pBlockInfoEx->len += pSrcBuf->pData[0].len;
    pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
    pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
    pBlockInfoEx->info.id = id;
  }
S
Shengliang Guan 已提交
854

855
  int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END);
856
  assert(r == 0);
S
Shengliang Guan 已提交
857

858 859
  int64_t offset = getDataStartOffset();
  int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
860
  int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
S
Shengliang Guan 已提交
861

862 863 864
  if (written == -1 || written != size) {
    return -1;
  }
S
Shengliang Guan 已提交
865

866 867 868 869 870
  pDestBuf->numOfTotal += pSrcBuf->numOfTotal;

  int32_t oldSize = pDestBuf->fileSize;

  // file meta data may be cached, close and reopen the file for accurate file size.
871 872 873 874
  taosCloseFile(&pDestBuf->pFile);
  // pDestBuf->pFile = fopen(pDestBuf->path, "rb+");
  pDestBuf->pFile = taosOpenFile(pDestBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pDestBuf->pFile == NULL) {
875 876 877
    return -1;
  }

878 879
  int64_t file_size;
  if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) {
S
Shengliang Guan 已提交
880
    return -1;
881
  }
882
  pDestBuf->fileSize = (uint32_t)file_size;
883 884 885 886 887 888 889 890

  assert(pDestBuf->fileSize == oldSize + size);

  return 0;
}

STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
  STSBuf* pTSBuf = tsBufCreate(true, order);
S
Shengliang Guan 已提交
891

892 893 894 895 896
  STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
  pBlockInfo->numOfBlocks = numOfBlocks;
  pBlockInfo->compLen = len;
  pBlockInfo->offset = getDataStartOffset();
  pBlockInfo->id = id;
S
Shengliang Guan 已提交
897

898 899
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
S
Shengliang Guan 已提交
900

901
  int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET);
902
  if (ret == -1) {
S
Shengliang Guan 已提交
903
    //    qError("fseek failed, errno:%d", errno);
904 905 906
    tsBufDestroy(pTSBuf);
    return NULL;
  }
907
  size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len);
908
  if (sz != len) {
S
Shengliang Guan 已提交
909
    //    qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
910 911 912 913
    tsBufDestroy(pTSBuf);
    return NULL;
  }
  pTSBuf->fileSize += len;
S
Shengliang Guan 已提交
914

915 916
  pTSBuf->tsOrder = order;
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
S
Shengliang Guan 已提交
917

918 919 920 921 922 923 924 925
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }

  // TODO taosFsync??
S
Shengliang Guan 已提交
926 927 928 929 930 931
  //  if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
  ////    qError("fsync failed, errno:%d", errno);
  //    tsBufDestroy(pTSBuf);
  //    return NULL;
  //  }

932 933 934 935 936
  return pTSBuf;
}

STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) {
  STSElem elem = {.id = -1};
S
Shengliang Guan 已提交
937

938 939 940
  if (pTSBuf == NULL) {
    return elem;
  }
S
Shengliang Guan 已提交
941

942 943 944 945
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return elem;
  }
S
Shengliang Guan 已提交
946

947 948
  // for debug purpose
  //  tsBufDisplay(pTSBuf);
S
Shengliang Guan 已提交
949

950 951
  STSCursor*         pCur = &pTSBuf->cur;
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
S
Shengliang Guan 已提交
952

953 954 955 956
  int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
  if (blockIndex < 0) {
    return elem;
  }
S
Shengliang Guan 已提交
957

958 959 960
  pCur->vgroupIndex = j;
  pCur->blockIndex = blockIndex;
  tsBufGetBlock(pTSBuf, j, blockIndex);
S
Shengliang Guan 已提交
961

962 963 964 965 966 967 968 969
  return tsBufGetElem(pTSBuf);
}

STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
  STSCursor c = {.vgroupIndex = -1};
  if (pTSBuf == NULL) {
    return c;
  }
S
Shengliang Guan 已提交
970

971 972 973 974 975 976 977
  return pTSBuf->cur;
}

void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
  if (pTSBuf == NULL || pCur == NULL) {
    return;
  }
S
Shengliang Guan 已提交
978

979 980 981 982
  //  assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
  if (pCur->vgroupIndex != -1) {
    tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
  }
S
Shengliang Guan 已提交
983

984 985 986 987 988 989 990
  pTSBuf->cur = *pCur;
}

void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
991

992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
  pTSBuf->cur.order = order;
}

STSBuf* tsBufClone(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }

  tsBufFlush(pTSBuf);

  return tsBufCreateFromFile(pTSBuf->path, false);
}

void tsBufDisplay(STSBuf* pTSBuf) {
  printf("-------start of ts comp file-------\n");
  printf("number of vnode:%d\n", pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1008

1009 1010
  int32_t old = pTSBuf->cur.order;
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
1011

1012
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
1013

1014 1015 1016
  while (tsBufNextPos(pTSBuf)) {
    STSElem elem = tsBufGetElem(pTSBuf);
    if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
1017
      printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts);
1018 1019
    }
  }
S
Shengliang Guan 已提交
1020

1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
  pTSBuf->cur.order = old;
  printf("-------end of ts comp file-------\n");
}

static int32_t getDataStartOffset() {
  return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
}

// update prev vnode length info in file
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
  int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
  doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
}

static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
  const int32_t INITIAL_GROUPINFO_SIZE = 4;
S
Shengliang Guan 已提交
1037

1038
  pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
wafwerar's avatar
wafwerar 已提交
1039
  pTSBuf->pData = taosMemoryCalloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
1040 1041 1042 1043
  if (pTSBuf->pData == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1044

wafwerar's avatar
wafwerar 已提交
1045
  pTSBuf->tsData.rawBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1046 1047 1048 1049
  if (pTSBuf->tsData.rawBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1050

1051 1052 1053
  pTSBuf->bufSize = MEM_BUF_SIZE;
  pTSBuf->tsData.threshold = MEM_BUF_SIZE;
  pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
S
Shengliang Guan 已提交
1054

wafwerar's avatar
wafwerar 已提交
1055
  pTSBuf->assistBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1056 1057 1058 1059
  if (pTSBuf->assistBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1060

wafwerar's avatar
wafwerar 已提交
1061
  pTSBuf->block.payload = taosMemoryMalloc(MEM_BUF_SIZE);
1062 1063 1064 1065
  if (pTSBuf->block.payload == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1066

1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  pTSBuf->fileSize += getDataStartOffset();
  return pTSBuf;
}

int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return 0;
  }

  return pTSBuf->numOfGroups;
}

void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
  int32_t size = tsBufGetNumOfGroup(pTSBuf);
  if (num != NULL) {
    *num = size;
  }

  *id = NULL;
  if (size == 0) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1090
  (*id) = taosMemoryMalloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
1091

S
Shengliang Guan 已提交
1092
  for (int32_t i = 0; i < size; ++i) {
1093 1094 1095 1096 1097 1098
    (*id)[i] = pTSBuf->pData[i].info.id;
  }
}

int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
  assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1099
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
1100 1101 1102 1103

  *len = 0;
  *numOfBlocks = 0;

1104
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
1105
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1106
    //    qError("%p: fseek failed: %s", pSql, tstrerror(code));
1107 1108 1109
    return code;
  }

1110
  size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
1111
  if (s != pBlockInfo->compLen) {
1112
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1113
    //    tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
    return code;
  }

  *len = pBlockInfo->compLen;
  *numOfBlocks = pBlockInfo->numOfBlocks;

  return TSDB_CODE_SUCCESS;
}

STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
  STSElem el = {.id = -1};

  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
    if (el.id == pTSBuf->pData[i].info.id) {
      return el;
    }
  }

  return el;
}

S
Shengliang Guan 已提交
1136
bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }