ttszip.c 33.5 KB
Newer Older
S
common  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17 18 19
#include "ttszip.h"
#include "taoserror.h"
#include "tcompression.h"
G
Ganlin Zhao 已提交
20
#include "tlog.h"
21 22

static int32_t getDataStartOffset();
S
Shengliang Guan 已提交
23
static void    TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo);
24 25 26 27 28 29 30 31 32 33
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);

/**
 * todo error handling
 * support auto closeable tmp file
 * @param path
 * @return
 */
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
wafwerar's avatar
wafwerar 已提交
34
  if (!osTempSpaceAvailable()) {
35
    terrno = TSDB_CODE_NO_DISKSPACE;
wafwerar's avatar
wafwerar 已提交
36
    // tscError("tmp file created failed since %s", terrstr());
wafwerar's avatar
wafwerar 已提交
37 38
    return NULL;
  }
G
Ganlin Zhao 已提交
39

wafwerar's avatar
wafwerar 已提交
40
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
41 42 43 44 45
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
46

S
os env  
Shengliang Guan 已提交
47
  taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
48
  // pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
49
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
50
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
51
    taosMemoryFree(pTSBuf);
52 53 54 55
    return NULL;
  }

  if (!autoDelete) {
G
Ganlin Zhao 已提交
56
    if (taosRemoveFile(pTSBuf->path) != 0) {
G
Ganlin Zhao 已提交
57 58 59
      taosMemoryFree(pTSBuf);
      return NULL;
    }
60
  }
S
Shengliang Guan 已提交
61

G
Ganlin Zhao 已提交
62
  if (allocResForTSBuf(pTSBuf) == NULL) {
63 64
    return NULL;
  }
S
Shengliang Guan 已提交
65

66 67 68
  // update the header info
  STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC};
  STSBufUpdateHeader(pTSBuf, &header);
S
Shengliang Guan 已提交
69

70 71 72 73
  tsBufResetPos(pTSBuf);
  pTSBuf->cur.order = TSDB_ORDER_ASC;

  pTSBuf->tsOrder = order;
S
Shengliang Guan 已提交
74

75 76 77 78
  return pTSBuf;
}

STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
wafwerar's avatar
wafwerar 已提交
79
  STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
80 81 82 83 84
  if (pTSBuf == NULL) {
    return NULL;
  }

  pTSBuf->autoDelete = autoDelete;
S
Shengliang Guan 已提交
85

86
  tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path));
S
Shengliang Guan 已提交
87

88 89 90
  // pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
  pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pTSBuf->pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
91
    taosMemoryFree(pTSBuf);
92 93
    return NULL;
  }
S
Shengliang Guan 已提交
94

95 96 97
  if (allocResForTSBuf(pTSBuf) == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
98

99 100
  // validate the file magic number
  STSBufFileHeader header = {0};
S
Shengliang Guan 已提交
101
  int32_t          ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
102
  UNUSED(ret);
103
  size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader));
104 105 106 107 108 109 110
  UNUSED(sz);

  // invalid file
  if (header.magic != TS_COMP_FILE_MAGIC) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
111

112 113
  if (header.numOfGroup > pTSBuf->numOfAlloc) {
    pTSBuf->numOfAlloc = header.numOfGroup;
wafwerar's avatar
wafwerar 已提交
114
    STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc);
115 116 117 118
    if (tmp == NULL) {
      tsBufDestroy(pTSBuf);
      return NULL;
    }
S
Shengliang Guan 已提交
119

120 121
    pTSBuf->pData = tmp;
  }
S
Shengliang Guan 已提交
122

123
  pTSBuf->numOfGroups = header.numOfGroup;
S
Shengliang Guan 已提交
124

125 126 127
  // check the ts order
  pTSBuf->tsOrder = header.tsOrder;
  if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
S
Shengliang Guan 已提交
128
    //    tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
129 130 131
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
132

133
  size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups;
S
Shengliang Guan 已提交
134

wafwerar's avatar
wafwerar 已提交
135
  STSGroupBlockInfo* buf = (STSGroupBlockInfo*)taosMemoryCalloc(1, infoSize);
136 137
  if (buf == NULL) {
    tsBufDestroy(pTSBuf);
S
Shengliang Guan 已提交
138 139 140 141
    return NULL;
  }

  // int64_t pos = ftell(pTSBuf->pFile); //pos not used
142
  sz = taosReadFile(pTSBuf->pFile, buf, infoSize);
143
  UNUSED(sz);
S
Shengliang Guan 已提交
144

145 146 147 148 149
  // the length value for each vnode is not kept in file, so does not set the length value
  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i];
    memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo));
  }
wafwerar's avatar
wafwerar 已提交
150
  taosMemoryFree(buf);
S
Shengliang Guan 已提交
151

152
  ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END);
153
  UNUSED(ret);
S
Shengliang Guan 已提交
154

155 156
  int64_t file_size;
  if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) {
157 158 159
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
160

161
  pTSBuf->fileSize = (uint32_t)file_size;
162
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
163

164 165
  // ascending by default
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
166 167 168 169 170

  //  tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
  //  fileno(pTSBuf->pFile),
  //           pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);

171 172 173 174 175 176 177
  return pTSBuf;
}

void* tsBufDestroy(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
178

wafwerar's avatar
wafwerar 已提交
179 180
  taosMemoryFreeClear(pTSBuf->assistBuf);
  taosMemoryFreeClear(pTSBuf->tsData.rawBuf);
S
Shengliang Guan 已提交
181

wafwerar's avatar
wafwerar 已提交
182 183
  taosMemoryFreeClear(pTSBuf->pData);
  taosMemoryFreeClear(pTSBuf->block.payload);
184 185

  if (!pTSBuf->remainOpen) {
186
    taosCloseFile(&pTSBuf->pFile);
187
  }
S
Shengliang Guan 已提交
188

189
  if (pTSBuf->autoDelete) {
S
Shengliang Guan 已提交
190
    //    ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
G
Ganlin Zhao 已提交
191 192 193
    if (taosRemoveFile(pTSBuf->path) != 0) {
      // tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path);
    }
194
  } else {
S
Shengliang Guan 已提交
195
    //    tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
196 197 198
  }

  taosVariantDestroy(&pTSBuf->block.tag);
wafwerar's avatar
wafwerar 已提交
199
  taosMemoryFree(pTSBuf);
200 201 202 203 204
  return NULL;
}

static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) {
  int32_t last = pTSBuf->numOfGroups - 1;
S
Shengliang Guan 已提交
205

G
Ganlin Zhao 已提交
206
  ASSERT(last >= 0);
207 208 209 210 211 212
  return &pTSBuf->pData[last];
}

static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
  if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) {
    uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
G
Ganlin Zhao 已提交
213
    ASSERT((int32_t)newSize > pTSBuf->numOfAlloc);
S
Shengliang Guan 已提交
214

H
Hongze Cheng 已提交
215 216
    STSGroupBlockInfoEx* tmp =
        (STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
217 218 219
    if (tmp == NULL) {
      return NULL;
    }
S
Shengliang Guan 已提交
220

221 222 223 224
    pTSBuf->pData = tmp;
    pTSBuf->numOfAlloc = newSize;
    memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups));
  }
S
Shengliang Guan 已提交
225

226 227
  if (pTSBuf->numOfGroups > 0) {
    STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
228

229 230 231
    // update prev vnode length info in file
    TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info);
  }
S
Shengliang Guan 已提交
232

233 234 235 236
  // set initial value for vnode block
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info;
  pBlockInfo->id = id;
  pBlockInfo->offset = pTSBuf->fileSize;
G
Ganlin Zhao 已提交
237
  ASSERT(pBlockInfo->offset >= getDataStartOffset());
S
Shengliang Guan 已提交
238

239 240
  // update vnode info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo);
S
Shengliang Guan 已提交
241

242 243
  // add one vnode info
  pTSBuf->numOfGroups += 1;
S
Shengliang Guan 已提交
244

245 246 247
  // update the header info
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
S
Shengliang Guan 已提交
248

249 250 251 252 253 254 255
  STSBufUpdateHeader(pTSBuf, &header);
  return tsBufGetLastGroupInfo(pTSBuf);
}

static void shrinkBuffer(STSList* ptsData) {
  // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
  if (ptsData->allocSize >= ptsData->threshold * 2) {
wafwerar's avatar
wafwerar 已提交
256
    char* rawBuf = taosMemoryRealloc(ptsData->rawBuf, MEM_BUF_SIZE);
S
Shengliang Guan 已提交
257
    if (rawBuf) {
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
      ptsData->rawBuf = rawBuf;
      ptsData->allocSize = MEM_BUF_SIZE;
    }
  }
}

static int32_t getTagAreaLength(SVariant* pa) {
  int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType);
  if (pa->nType != TSDB_DATA_TYPE_NULL) {
    t += pa->nLen;
  }

  return t;
}

static void writeDataToDisk(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len == 0) {
    return;
  }
S
Shengliang Guan 已提交
277

278 279 280 281
  STSBlock* pBlock = &pTSBuf->block;
  STSList*  pTsData = &pTSBuf->tsData;

  pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
282 283 284
  pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload,
                                        pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);

285
  int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET);
G
Ganlin Zhao 已提交
286
  ASSERT(r == 0);
S
Shengliang Guan 已提交
287

288 289 290 291 292 293 294 295
  /*
   * format for output data:
   * 1. tags, number of ts, size after compressed, payload, size after compressed
   * 2. tags, number of ts, size after compressed, payload, size after compressed
   *
   * both side has the compressed length is used to support load data forwards/backwords.
   */
  int32_t metaLen = 0;
296
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
297 298 299

  int32_t trueLen = pBlock->tag.nLen;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
300 301
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
302
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
303
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
304
    float tfloat = (float)pBlock->tag.d;
S
Shengliang Guan 已提交
305
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
306
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
307
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
308
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
309 310
  } else {
    trueLen = 0;
311
    metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
312 313
  }

314 315 316 317
  taosWriteFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
  taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
  taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
318

S
Shengliang Guan 已提交
319
  metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen));
G
Ganlin Zhao 已提交
320
  ASSERT(metaLen == getTagAreaLength(&pBlock->tag));
321 322 323

  int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
  pTSBuf->fileSize += blockSize;
S
Shengliang Guan 已提交
324

325
  pTSBuf->tsData.len = 0;
S
Shengliang Guan 已提交
326

327
  STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
328

329 330
  pGroupBlockInfoEx->info.compLen += blockSize;
  pGroupBlockInfoEx->info.numOfBlocks += 1;
S
Shengliang Guan 已提交
331

332 333 334 335 336 337
  shrinkBuffer(&pTSBuf->tsData);
}

static void expandBuffer(STSList* ptsData, int32_t inputSize) {
  if (ptsData->allocSize - ptsData->len < inputSize) {
    int32_t newSize = inputSize + ptsData->len;
wafwerar's avatar
wafwerar 已提交
338
    char*   tmp = taosMemoryRealloc(ptsData->rawBuf, (size_t)newSize);
339 340 341
    if (tmp == NULL) {
      // todo
    }
S
Shengliang Guan 已提交
342

343 344 345 346 347 348 349 350 351
    ptsData->rawBuf = tmp;
    ptsData->allocSize = newSize;
  }
}

STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
  STSBlock* pBlock = &pTSBuf->block;

  // clear the memory buffer
S
Shengliang Guan 已提交
352 353
  pBlock->compLen = 0;
  pBlock->padding = 0;
354 355 356 357 358 359 360 361 362
  pBlock->numOfElem = 0;

  int32_t offset = -1;

  if (order == TSDB_ORDER_DESC) {
    /*
     * set the right position for the reversed traverse, the reversed traverse is started from
     * the end of each comp data block
     */
S
Shengliang Guan 已提交
363
    int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen));
364
    int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR);
S
Shengliang Guan 已提交
365
    size_t  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
366
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
S
Shengliang Guan 已提交
367
    UNUSED(sz);
368 369 370 371

    pBlock->compLen = pBlock->padding;

    offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
372
    ret = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
373 374 375
    UNUSED(ret);
  }

376 377
  int32_t ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType));
  ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen));
378 379 380 381

  // NOTE: mix types tags are not supported
  size_t sz = 0;
  if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
wafwerar's avatar
wafwerar 已提交
382
    char* tp = taosMemoryRealloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
G
Ganlin Zhao 已提交
383
    ASSERT(tp != NULL);
384 385 386 387

    memset(tp, 0, pBlock->tag.nLen + 1);
    pBlock->tag.pz = tp;

388
    sz = taosReadFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen);
389 390 391
    UNUSED(sz);
  } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
    float tfloat = 0;
S
Shengliang Guan 已提交
392
    sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen);
393 394
    pBlock->tag.d = (double)tfloat;
    UNUSED(sz);
S
Shengliang Guan 已提交
395 396
  } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {  // TODO check the return value
    sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen);
397 398 399
    UNUSED(sz);
  }

400
  sz = taosReadFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem));
401
  UNUSED(sz);
402
  sz = taosReadFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen));
403
  UNUSED(sz);
404
  sz = taosReadFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen);
405 406 407 408 409 410

  if (decomp) {
    pTSBuf->tsData.len =
        tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                              pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
  }
S
Shengliang Guan 已提交
411

412
  // read the comp length at the length of comp block
413
  sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding));
G
Ganlin Zhao 已提交
414
  ASSERT(pBlock->padding == pBlock->compLen);
415 416

  int32_t n = 0;
417
  sz = taosReadFile(pTSBuf->pFile, &n, sizeof(pBlock->tag.nLen));
418
  if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) {
G
Ganlin Zhao 已提交
419
    ASSERT(n == 0);
420
  } else {
G
Ganlin Zhao 已提交
421
    ASSERT(n == pBlock->tag.nLen);
422 423 424
  }

  UNUSED(sz);
S
Shengliang Guan 已提交
425

426 427
  // for backwards traverse, set the start position at the end of previous block
  if (order == TSDB_ORDER_DESC) {
428
    int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR);
429 430
    UNUSED(r);
  }
S
Shengliang Guan 已提交
431

432 433 434 435 436 437
  return pBlock;
}

// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
  STSList* ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
438

439 440 441
  if (pTSBuf->tsOrder == -1) {
    if (ptsData->len > 0) {
      TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
S
Shengliang Guan 已提交
442

443 444 445 446 447 448 449 450 451
      if (lastKey > *(TSKEY*)pData) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      }
    } else if (len > TSDB_KEYSIZE) {
      // no data in current vnode, more than one ts is added, check the orders
      TSKEY k1 = *(TSKEY*)(pData);
      TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
S
Shengliang Guan 已提交
452

453 454 455 456 457 458 459 460 461 462 463
      if (k1 < k2) {
        pTSBuf->tsOrder = TSDB_ORDER_ASC;
      } else if (k1 > k2) {
        pTSBuf->tsOrder = TSDB_ORDER_DESC;
      } else {
        // todo handle error
      }
    }
  } else {
    // todo the timestamp order is set, check the asc/desc order of appended data
  }
S
Shengliang Guan 已提交
464

465 466 467 468 469 470
  return TSDB_CODE_SUCCESS;
}

void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) {
  STSGroupBlockInfoEx* pBlockInfo = NULL;
  STSList*             ptsData = &pTSBuf->tsData;
S
Shengliang Guan 已提交
471

472 473 474
  if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
S
Shengliang Guan 已提交
475

476 477 478 479
    pBlockInfo = addOneGroupInfo(pTSBuf, id);
  } else {
    pBlockInfo = tsBufGetLastGroupInfo(pTSBuf);
  }
S
Shengliang Guan 已提交
480

G
Ganlin Zhao 已提交
481
  ASSERT(pBlockInfo->info.id == id);
482 483 484 485 486 487 488 489 490 491

  if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
    // new arrived data with different tags value, save current value into disk first
    writeDataToDisk(pTSBuf);
  } else {
    expandBuffer(ptsData, len);
  }

  taosVariantAssign(&pTSBuf->block.tag, tag);
  memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
S
Shengliang Guan 已提交
492

493 494
  // todo check return value
  setCheckTSOrder(pTSBuf, pData, len);
S
Shengliang Guan 已提交
495

496 497
  ptsData->len += len;
  pBlockInfo->len += len;
S
Shengliang Guan 已提交
498

499
  pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
S
Shengliang Guan 已提交
500

501 502 503 504 505 506
  // the size of raw data exceeds the size of the default prepared buffer, so
  // during getBufBlock, the output buffer needs to be large enough.
  if (ptsData->len >= ptsData->threshold) {
    writeDataToDisk(pTSBuf);
    shrinkBuffer(ptsData);
  }
S
Shengliang Guan 已提交
507

508 509 510 511 512 513 514
  tsBufResetPos(pTSBuf);
}

void tsBufFlush(STSBuf* pTSBuf) {
  if (pTSBuf->tsData.len <= 0) {
    return;
  }
S
Shengliang Guan 已提交
515

516 517
  writeDataToDisk(pTSBuf);
  shrinkBuffer(&pTSBuf->tsData);
S
Shengliang Guan 已提交
518

519
  STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf);
S
Shengliang Guan 已提交
520

521 522
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info);
S
Shengliang Guan 已提交
523

524 525 526 527 528 529 530 531 532 533 534 535 536 537
  // save the ts order into header
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  STSBufUpdateHeader(pTSBuf, &header);
}

static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) {
  int32_t j = -1;
  for (int32_t i = 0; i < numOfGroups; ++i) {
    if (pGroupInfoEx[i].info.id == id) {
      j = i;
      break;
    }
  }
S
Shengliang Guan 已提交
538

539 540 541 542 543
  return j;
}

// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) {
544
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
545 546
    return -1;
  }
S
Shengliang Guan 已提交
547

548 549 550
  // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
  int32_t i = 0;
  bool    decomp = false;
S
Shengliang Guan 已提交
551

552 553 554 555 556
  while ((i++) <= blockIndex) {
    if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) {
      return -1;
    }
  }
S
Shengliang Guan 已提交
557

558 559 560 561 562
  // set the file position to be the end of previous comp block
  if (pTSBuf->cur.order == TSDB_ORDER_DESC) {
    STSBlock* pBlock = &pTSBuf->block;
    int32_t   compBlockSize =
        pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag);
563
    int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR);
564 565
    UNUSED(ret);
  }
S
Shengliang Guan 已提交
566

567 568 569 570 571
  return 0;
}

static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) {
  bool decomp = false;
S
Shengliang Guan 已提交
572

573 574 575 576 577 578
  int64_t offset = 0;
  if (pTSBuf->cur.order == TSDB_ORDER_ASC) {
    offset = pBlockInfo->offset;
  } else {  // reversed traverse starts from the end of block
    offset = pBlockInfo->offset + pBlockInfo->compLen;
  }
S
Shengliang Guan 已提交
579

580
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
581 582
    return -1;
  }
S
Shengliang Guan 已提交
583

584 585 586 587
  for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
    if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
      return -1;
    }
S
Shengliang Guan 已提交
588

589
    if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) {
S
Shengliang Guan 已提交
590
      return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1));
591 592
    }
  }
S
Shengliang Guan 已提交
593

594 595 596 597 598 599
  return -1;
}

static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) {
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
  if (pBlockInfo->numOfBlocks <= blockIndex) {
G
Ganlin Zhao 已提交
600
    ASSERT(false);
601
  }
S
Shengliang Guan 已提交
602

603 604
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) ||
S
Shengliang Guan 已提交
605
                                          (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) {
606 607 608
    int32_t i = 0;
    bool    decomp = false;
    int32_t step = abs(blockIndex - pCur->blockIndex);
S
Shengliang Guan 已提交
609

610 611 612 613 614 615 616
    while ((++i) <= step) {
      if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
        return;
      }
    }
  } else {
    if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
G
Ganlin Zhao 已提交
617
      ASSERT(false);
618 619
    }
  }
S
Shengliang Guan 已提交
620

621
  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
622

623
  size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
S
Shengliang Guan 已提交
624

625 626 627 628 629 630 631
  /*
   * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
   * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
   */
  if (s > pTSBuf->tsData.allocSize) {
    expandBuffer(&pTSBuf->tsData, (int32_t)s);
  }
S
Shengliang Guan 已提交
632

633 634 635
  pTSBuf->tsData.len =
      tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
                            pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
S
Shengliang Guan 已提交
636

G
Ganlin Zhao 已提交
637
  ASSERT((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
S
Shengliang Guan 已提交
638

639 640
  pCur->vgroupIndex = groupIndex;
  pCur->blockIndex = blockIndex;
S
Shengliang Guan 已提交
641

642 643 644 645 646 647 648 649
  pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
}

static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) {
  if (offset < 0 || offset >= getDataStartOffset()) {
    return -1;
  }

650
  if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) {
651 652 653
    return -1;
  }

654
  taosWriteFile(pTSBuf->pFile, pVInfo, sizeof(STSGroupBlockInfo));
655 656 657 658 659 660 661 662
  return 0;
}

STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return NULL;
  }
S
Shengliang Guan 已提交
663

664 665 666 667
  return &pTSBuf->pData[j].info;
}

int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
668
  if ((pTSBuf->pFile == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
669 670 671
    return -1;
  }

G
Ganlin Zhao 已提交
672 673 674
  if (pHeader->tsOrder != TSDB_ORDER_ASC && pHeader->tsOrder != TSDB_ORDER_DESC) {
    return -1;
  }
675

676
  int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET);
677
  if (r != 0) {
S
Shengliang Guan 已提交
678
    //    qError("fseek failed, errno:%d", errno);
679 680 681
    return -1;
  }

682
  size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader));
S
Shengliang Guan 已提交
683 684 685
  if (ws != 1) {
    //    qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
    //    (int32_t)sizeof(STSBufFileHeader));
686 687 688 689 690 691 692 693 694
    return -1;
  }
  return 0;
}

bool tsBufNextPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) {
    return false;
  }
S
Shengliang Guan 已提交
695

696
  STSCursor* pCur = &pTSBuf->cur;
S
Shengliang Guan 已提交
697

698 699 700 701
  // get the first/last position according to traverse order
  if (pCur->vgroupIndex == -1) {
    if (pCur->order == TSDB_ORDER_ASC) {
      tsBufGetBlock(pTSBuf, 0, 0);
S
Shengliang Guan 已提交
702

703 704 705 706 707 708
      if (pTSBuf->block.numOfElem == 0) {  // the whole list is empty, return
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
S
Shengliang Guan 已提交
709

710
    } else {  // get the last timestamp record in the last block of the last vnode
G
Ganlin Zhao 已提交
711
      ASSERT(pTSBuf->numOfGroups > 0);
S
Shengliang Guan 已提交
712

713 714
      int32_t groupIndex = pTSBuf->numOfGroups - 1;
      pCur->vgroupIndex = groupIndex;
S
Shengliang Guan 已提交
715

716 717 718
      int32_t            id = pTSBuf->pData[pCur->vgroupIndex].info.id;
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      int32_t            blockIndex = pBlockInfo->numOfBlocks - 1;
S
Shengliang Guan 已提交
719

720
      tsBufGetBlock(pTSBuf, groupIndex, blockIndex);
S
Shengliang Guan 已提交
721

722 723 724 725 726 727 728 729 730
      pCur->tsIndex = pTSBuf->block.numOfElem - 1;
      if (pTSBuf->block.numOfElem == 0) {
        tsBufResetPos(pTSBuf);
        return false;
      } else {
        return true;
      }
    }
  }
S
Shengliang Guan 已提交
731

732
  int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1;
S
Shengliang Guan 已提交
733

734
  while (1) {
G
Ganlin Zhao 已提交
735
    ASSERT(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
S
Shengliang Guan 已提交
736

737 738 739
    if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
        (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) {
      int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id;
S
Shengliang Guan 已提交
740

741 742 743 744 745 746 747 748
      STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id);
      if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) ||
          (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
        if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) ||
            (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) {
          pCur->vgroupIndex = -1;
          return false;
        }
S
Shengliang Guan 已提交
749

750 751 752
        if (pBlockInfo == NULL) {
          return false;
        }
S
Shengliang Guan 已提交
753

754 755 756
        int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
        break;
S
Shengliang Guan 已提交
757

758 759 760 761 762 763 764 765 766
      } else {
        tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step);
        break;
      }
    } else {
      pCur->tsIndex += step;
      break;
    }
  }
S
Shengliang Guan 已提交
767

768 769 770 771 772 773 774
  return true;
}

void tsBufResetPos(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
775

776 777 778 779 780 781 782 783
  pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order};
}

STSElem tsBufGetElem(STSBuf* pTSBuf) {
  STSElem elem1 = {.id = -1};
  if (pTSBuf == NULL) {
    return elem1;
  }
S
Shengliang Guan 已提交
784

785 786 787 788 789 790
  STSCursor* pCur = &pTSBuf->cur;
  if (pCur != NULL && pCur->vgroupIndex < 0) {
    return elem1;
  }

  STSBlock* pBlock = &pTSBuf->block;
S
Shengliang Guan 已提交
791

792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
  elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id;
  elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
  elem1.tag = &pBlock->tag;

  return elem1;
}

/**
 * current only support ts comp data from two vnode merge
 * @param pDestBuf
 * @param pSrcBuf
 * @param id
 * @return
 */
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
  if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) {
    return 0;
  }
S
Shengliang Guan 已提交
810

811 812 813
  if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) {
    return -1;
  }
S
Shengliang Guan 已提交
814

815
  // src can only have one vnode index
G
Ganlin Zhao 已提交
816
  ASSERT(pSrcBuf->numOfGroups == 1);
817 818 819

  // there are data in buffer, flush to disk first
  tsBufFlush(pDestBuf);
S
Shengliang Guan 已提交
820

821
  // compared with the last vnode id
S
Shengliang Guan 已提交
822
  int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id;
823 824 825
  if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) {
    int32_t oldSize = pDestBuf->numOfGroups;
    int32_t newSize = oldSize + pSrcBuf->numOfGroups;
S
Shengliang Guan 已提交
826

827 828
    if (pDestBuf->numOfAlloc < newSize) {
      pDestBuf->numOfAlloc = newSize;
S
Shengliang Guan 已提交
829

wafwerar's avatar
wafwerar 已提交
830
      STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize);
831 832 833
      if (tmp == NULL) {
        return -1;
      }
S
Shengliang Guan 已提交
834

835 836
      pDestBuf->pData = tmp;
    }
S
Shengliang Guan 已提交
837

838 839
    // directly copy the vnode index information
    memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx));
S
Shengliang Guan 已提交
840

841 842 843 844 845 846
    // set the new offset value
    for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) {
      STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
      pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
      pBlockInfoEx->info.id = id;
    }
S
Shengliang Guan 已提交
847

848 849 850
    pDestBuf->numOfGroups = newSize;
  } else {
    STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf);
S
Shengliang Guan 已提交
851

852 853 854 855 856
    pBlockInfoEx->len += pSrcBuf->pData[0].len;
    pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
    pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
    pBlockInfoEx->info.id = id;
  }
S
Shengliang Guan 已提交
857

858
  int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END);
G
Ganlin Zhao 已提交
859
  ASSERT(r == 0);
S
Shengliang Guan 已提交
860

861 862
  int64_t offset = getDataStartOffset();
  int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
863
  int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size);
S
Shengliang Guan 已提交
864

865 866 867
  if (written == -1 || written != size) {
    return -1;
  }
S
Shengliang Guan 已提交
868

869 870 871 872 873
  pDestBuf->numOfTotal += pSrcBuf->numOfTotal;

  int32_t oldSize = pDestBuf->fileSize;

  // file meta data may be cached, close and reopen the file for accurate file size.
874 875 876 877
  taosCloseFile(&pDestBuf->pFile);
  // pDestBuf->pFile = fopen(pDestBuf->path, "rb+");
  pDestBuf->pFile = taosOpenFile(pDestBuf->path, TD_FILE_WRITE | TD_FILE_READ);
  if (pDestBuf->pFile == NULL) {
878 879 880
    return -1;
  }

881 882
  int64_t file_size;
  if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) {
S
Shengliang Guan 已提交
883
    return -1;
884
  }
885
  pDestBuf->fileSize = (uint32_t)file_size;
886

G
Ganlin Zhao 已提交
887
  ASSERT(pDestBuf->fileSize == oldSize + size);
888 889 890 891 892 893

  return 0;
}

STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) {
  STSBuf* pTSBuf = tsBufCreate(true, order);
S
Shengliang Guan 已提交
894

895 896 897 898 899
  STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info);
  pBlockInfo->numOfBlocks = numOfBlocks;
  pBlockInfo->compLen = len;
  pBlockInfo->offset = getDataStartOffset();
  pBlockInfo->id = id;
S
Shengliang Guan 已提交
900

901 902
  // update prev vnode length info in file
  TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo);
S
Shengliang Guan 已提交
903

904
  int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET);
905
  if (ret == -1) {
S
Shengliang Guan 已提交
906
    //    qError("fseek failed, errno:%d", errno);
907 908 909
    tsBufDestroy(pTSBuf);
    return NULL;
  }
910
  size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len);
911
  if (sz != len) {
S
Shengliang Guan 已提交
912
    //    qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
913 914 915 916
    tsBufDestroy(pTSBuf);
    return NULL;
  }
  pTSBuf->fileSize += len;
S
Shengliang Guan 已提交
917

918
  pTSBuf->tsOrder = order;
G
Ganlin Zhao 已提交
919 920 921 922
  if (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
923

924 925 926 927 928 929 930 931
  STSBufFileHeader header = {
      .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder};
  if (STSBufUpdateHeader(pTSBuf, &header) < 0) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }

  // TODO taosFsync??
S
Shengliang Guan 已提交
932 933 934 935 936 937
  //  if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
  ////    qError("fsync failed, errno:%d", errno);
  //    tsBufDestroy(pTSBuf);
  //    return NULL;
  //  }

938 939 940 941 942
  return pTSBuf;
}

STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) {
  STSElem elem = {.id = -1};
S
Shengliang Guan 已提交
943

944 945 946
  if (pTSBuf == NULL) {
    return elem;
  }
S
Shengliang Guan 已提交
947

948 949 950 951
  int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
  if (j == -1) {
    return elem;
  }
S
Shengliang Guan 已提交
952

953 954
  // for debug purpose
  //  tsBufDisplay(pTSBuf);
S
Shengliang Guan 已提交
955

956 957
  STSCursor*         pCur = &pTSBuf->cur;
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
S
Shengliang Guan 已提交
958

959 960 961 962
  int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
  if (blockIndex < 0) {
    return elem;
  }
S
Shengliang Guan 已提交
963

964 965 966
  pCur->vgroupIndex = j;
  pCur->blockIndex = blockIndex;
  tsBufGetBlock(pTSBuf, j, blockIndex);
S
Shengliang Guan 已提交
967

968 969 970 971 972 973 974 975
  return tsBufGetElem(pTSBuf);
}

STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
  STSCursor c = {.vgroupIndex = -1};
  if (pTSBuf == NULL) {
    return c;
  }
S
Shengliang Guan 已提交
976

977 978 979 980 981 982 983
  return pTSBuf->cur;
}

void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
  if (pTSBuf == NULL || pCur == NULL) {
    return;
  }
S
Shengliang Guan 已提交
984

985 986 987
  if (pCur->vgroupIndex != -1) {
    tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex);
  }
S
Shengliang Guan 已提交
988

989 990 991 992 993 994 995
  pTSBuf->cur = *pCur;
}

void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
  if (pTSBuf == NULL) {
    return;
  }
S
Shengliang Guan 已提交
996

997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
  pTSBuf->cur.order = order;
}

STSBuf* tsBufClone(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return NULL;
  }

  tsBufFlush(pTSBuf);

  return tsBufCreateFromFile(pTSBuf->path, false);
}

void tsBufDisplay(STSBuf* pTSBuf) {
  printf("-------start of ts comp file-------\n");
  printf("number of vnode:%d\n", pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1013

1014 1015
  int32_t old = pTSBuf->cur.order;
  pTSBuf->cur.order = TSDB_ORDER_ASC;
S
Shengliang Guan 已提交
1016

1017
  tsBufResetPos(pTSBuf);
S
Shengliang Guan 已提交
1018

1019 1020 1021
  while (tsBufNextPos(pTSBuf)) {
    STSElem elem = tsBufGetElem(pTSBuf);
    if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) {
1022
      printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts);
1023 1024
    }
  }
S
Shengliang Guan 已提交
1025

1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
  pTSBuf->cur.order = old;
  printf("-------end of ts comp file-------\n");
}

static int32_t getDataStartOffset() {
  return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo);
}

// update prev vnode length info in file
static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) {
  int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo);
  doUpdateGroupInfo(pTSBuf, offset, pBlockInfo);
}

static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
  const int32_t INITIAL_GROUPINFO_SIZE = 4;
S
Shengliang Guan 已提交
1042

1043
  pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE;
wafwerar's avatar
wafwerar 已提交
1044
  pTSBuf->pData = taosMemoryCalloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx));
1045 1046 1047 1048
  if (pTSBuf->pData == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1049

wafwerar's avatar
wafwerar 已提交
1050
  pTSBuf->tsData.rawBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1051 1052 1053 1054
  if (pTSBuf->tsData.rawBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1055

1056 1057 1058
  pTSBuf->bufSize = MEM_BUF_SIZE;
  pTSBuf->tsData.threshold = MEM_BUF_SIZE;
  pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
S
Shengliang Guan 已提交
1059

wafwerar's avatar
wafwerar 已提交
1060
  pTSBuf->assistBuf = taosMemoryMalloc(MEM_BUF_SIZE);
1061 1062 1063 1064
  if (pTSBuf->assistBuf == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1065

wafwerar's avatar
wafwerar 已提交
1066
  pTSBuf->block.payload = taosMemoryMalloc(MEM_BUF_SIZE);
1067 1068 1069 1070
  if (pTSBuf->block.payload == NULL) {
    tsBufDestroy(pTSBuf);
    return NULL;
  }
S
Shengliang Guan 已提交
1071

1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
  pTSBuf->fileSize += getDataStartOffset();
  return pTSBuf;
}

int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) {
  if (pTSBuf == NULL) {
    return 0;
  }

  return pTSBuf->numOfGroups;
}

void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
  int32_t size = tsBufGetNumOfGroup(pTSBuf);
  if (num != NULL) {
    *num = size;
  }

  *id = NULL;
  if (size == 0) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1095
  (*id) = taosMemoryMalloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t));
1096

S
Shengliang Guan 已提交
1097
  for (int32_t i = 0; i < size; ++i) {
1098 1099 1100 1101 1102
    (*id)[i] = pTSBuf->pData[i].info.id;
  }
}

int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
G
Ganlin Zhao 已提交
1103
  ASSERT(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
S
Shengliang Guan 已提交
1104
  STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info;
1105 1106 1107 1108

  *len = 0;
  *numOfBlocks = 0;

1109
  if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) {
1110
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1111
    //    qError("%p: fseek failed: %s", pSql, tstrerror(code));
1112 1113 1114
    return code;
  }

1115
  size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen);
1116
  if (s != pBlockInfo->compLen) {
1117
    int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile));
S
Shengliang Guan 已提交
1118
    //    tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
    return code;
  }

  *len = pBlockInfo->compLen;
  *numOfBlocks = pBlockInfo->numOfBlocks;

  return TSDB_CODE_SUCCESS;
}

STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
  STSElem el = {.id = -1};

  for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
    el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag);
    if (el.id == pTSBuf->pData[i].info.id) {
      return el;
    }
  }

  return el;
}

S
Shengliang Guan 已提交
1141
bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; }