index_tfile.c 14.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
//#include <sys/types.h>
//#include <dirent.h>
dengyihao's avatar
dengyihao 已提交
18
#include "index_tfile.h"
19
#include "index.h"
dengyihao's avatar
dengyihao 已提交
20
#include "index_fst.h"
21
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
22
#include "index_util.h"
dengyihao's avatar
dengyihao 已提交
23
#include "taosdef.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))

dengyihao's avatar
dengyihao 已提交
28
static int  tfileStrCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
29 30
static int  tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
dengyihao's avatar
dengyihao 已提交
31

dengyihao's avatar
dengyihao 已提交
32
static int tfileWriteHeader(TFileWriter* writer);
dengyihao's avatar
dengyihao 已提交
33
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
dengyihao's avatar
dengyihao 已提交
34
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
dengyihao's avatar
dengyihao 已提交
35

dengyihao's avatar
dengyihao 已提交
36 37 38 39 40
static int  tfileReadLoadHeader(TFileReader* reader);
static int  tfileReadLoadFst(TFileReader* reader);
static int  tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static void tfileReadRef(TFileReader* reader);
static void tfileReadUnRef(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
static int  tfileGetFileList(const char* path, SArray* result);
dengyihao's avatar
dengyihao 已提交
43
static int  tfileRmExpireFile(SArray* result);
dengyihao's avatar
dengyihao 已提交
44 45 46
static void tfileDestroyFileName(void* elem);
static int  tfileCompare(const void* a, const void* b);
static int  tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
dengyihao's avatar
dengyihao 已提交
47
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
dengyihao's avatar
dengyihao 已提交
48
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
49

dengyihao's avatar
dengyihao 已提交
50 51
TFileCache* tfileCacheCreate(const char* path) {
  TFileCache* tcache = calloc(1, sizeof(TFileCache));
dengyihao's avatar
dengyihao 已提交
52
  if (tcache == NULL) { return NULL; }
53 54 55 56

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

dengyihao's avatar
dengyihao 已提交
57
  SArray* files = taosArrayInit(4, sizeof(void*));
58
  tfileGetFileList(path, files);
dengyihao's avatar
dengyihao 已提交
59
  taosArraySort(files, tfileCompare);
dengyihao's avatar
dengyihao 已提交
60 61
  tfileRmExpireFile(files);

dengyihao's avatar
dengyihao 已提交
62 63
  uint64_t suid;
  int32_t  colId, version;
dengyihao's avatar
dengyihao 已提交
64
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
dengyihao's avatar
dengyihao 已提交
65 66
    char* file = taosArrayGetP(files, i);
    if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) {
dengyihao's avatar
dengyihao 已提交
67
      indexInfo("try parse invalid file:  %s, skip it", file);
68 69
      continue;
    }
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71
    WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64);
72
    if (wc == NULL) {
dengyihao's avatar
dengyihao 已提交
73
      indexError("failed to open index:  %s", file);
74
      goto End;
dengyihao's avatar
dengyihao 已提交
75
    }
dengyihao's avatar
dengyihao 已提交
76

dengyihao's avatar
dengyihao 已提交
77
    TFileReader* reader = tfileReaderCreate(wc);
78
    if (0 != tfileReadLoadHeader(reader)) {
dengyihao's avatar
dengyihao 已提交
79
      tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
80
      indexError("failed to load index header, index file: %s", file);
81
      goto End;
dengyihao's avatar
dengyihao 已提交
82
    }
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84 85 86
    if (0 != tfileReadLoadFst(reader)) {
      tfileReaderDestroy(reader);
      indexError("failed to load index fst, index file: %s", file);
dengyihao's avatar
dengyihao 已提交
87
      goto End;
dengyihao's avatar
dengyihao 已提交
88
    }
dengyihao's avatar
dengyihao 已提交
89
    tfileReadRef(reader);
dengyihao's avatar
dengyihao 已提交
90
    // loader fst and validate it
dengyihao's avatar
dengyihao 已提交
91
    TFileHeader*  header = &reader->header;
dengyihao's avatar
dengyihao 已提交
92
    TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94 95
    char buf[128] = {0};
    tfileSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
96
    taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
97
  }
dengyihao's avatar
dengyihao 已提交
98
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
99
  return tcache;
dengyihao's avatar
dengyihao 已提交
100
End:
101
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
102
  taosArrayDestroyEx(files, tfileDestroyFileName);
103
  return NULL;
dengyihao's avatar
dengyihao 已提交
104
}
dengyihao's avatar
dengyihao 已提交
105
void tfileCacheDestroy(TFileCache* tcache) {
dengyihao's avatar
dengyihao 已提交
106
  if (tcache == NULL) { return; }
107 108

  // free table cache
dengyihao's avatar
dengyihao 已提交
109
  TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
110
  while (reader) {
dengyihao's avatar
dengyihao 已提交
111
    TFileReader* p = *reader;
dengyihao's avatar
dengyihao 已提交
112 113
    indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);

dengyihao's avatar
dengyihao 已提交
114
    tfileReadUnRef(p);
115 116 117 118
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
  free(tcache);
dengyihao's avatar
dengyihao 已提交
119 120
}

dengyihao's avatar
dengyihao 已提交
121
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
122
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
123
  tfileSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
124

dengyihao's avatar
dengyihao 已提交
125
  TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
dengyihao's avatar
dengyihao 已提交
126 127
  tfileReadRef(reader);

128
  return reader;
dengyihao's avatar
dengyihao 已提交
129
}
dengyihao's avatar
dengyihao 已提交
130
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
131 132
  char buf[128] = {0};
  tfileSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139
  // remove last version index reader
  TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
  if (*p != NULL) {
    TFileReader* oldReader = *p;
    taosHashRemove(tcache->tableCache, buf, strlen(buf));
    tfileReadUnRef(oldReader);
  }
dengyihao's avatar
dengyihao 已提交
140

dengyihao's avatar
dengyihao 已提交
141
  tfileReadRef(reader);
dengyihao's avatar
dengyihao 已提交
142
  taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
143
  return;
144
}
dengyihao's avatar
dengyihao 已提交
145

dengyihao's avatar
dengyihao 已提交
146 147
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
  TFileReader* reader = calloc(1, sizeof(TFileReader));
dengyihao's avatar
dengyihao 已提交
148 149
  if (reader == NULL) { return NULL; }

150
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
151
  reader->ctx = ctx;
152
  return reader;
dengyihao's avatar
dengyihao 已提交
153
}
dengyihao's avatar
dengyihao 已提交
154
void tfileReaderDestroy(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
155
  if (reader == NULL) { return; }
156
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
157
  fstDestroy(reader->fst);
158
  writerCtxDestroy(reader->ctx);
dengyihao's avatar
dengyihao 已提交
159 160 161
  free(reader);
}

dengyihao's avatar
dengyihao 已提交
162
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
dengyihao's avatar
dengyihao 已提交
163 164
  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
167
  // refactor to callback later
dengyihao's avatar
dengyihao 已提交
168
  if (qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
169 170 171
    uint64_t offset;
    FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
    if (fstGet(reader->fst, &key, &offset)) {
dengyihao's avatar
dengyihao 已提交
172 173
      indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
      ret = tfileReadLoadTableIds(reader, offset, result);
dengyihao's avatar
dengyihao 已提交
174
    } else {
dengyihao's avatar
dengyihao 已提交
175
      indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
176
    }
dengyihao's avatar
dengyihao 已提交
177 178
    fstSliceDestroy(&key);
  } else if (qtype == QUERY_PREFIX) {
dengyihao's avatar
dengyihao 已提交
179
    // handle later
dengyihao's avatar
dengyihao 已提交
180
    //
dengyihao's avatar
dengyihao 已提交
181 182
  } else {
    // handle later
dengyihao's avatar
dengyihao 已提交
183
  }
dengyihao's avatar
dengyihao 已提交
184 185
  tfileReadUnRef(reader);
  return ret;
dengyihao's avatar
dengyihao 已提交
186 187
}

dengyihao's avatar
dengyihao 已提交
188
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
dengyihao's avatar
dengyihao 已提交
189 190 191 192 193
  // char pathBuf[128] = {0};
  // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
  // TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
  // memcpy(header.colName, );

dengyihao's avatar
dengyihao 已提交
194 195 196 197 198 199 200
  // char buf[TFILE_HADER_PRE_SIZE];
  // int  len = TFILE_HADER_PRE_SIZE;
  // if (len != ctx->write(ctx, buf, len)) {
  //  indexError("index: %" PRIu64 " failed to write header info", header->suid);
  //  return NULL;
  //}
  TFileWriter* tw = calloc(1, sizeof(TFileWriter));
dengyihao's avatar
dengyihao 已提交
201
  if (tw == NULL) {
dengyihao's avatar
dengyihao 已提交
202
    indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
dengyihao's avatar
dengyihao 已提交
203 204
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
205 206
  tw->ctx = ctx;
  tw->header = *header;
dengyihao's avatar
dengyihao 已提交
207
  tfileWriteHeader(tw);
dengyihao's avatar
dengyihao 已提交
208 209 210 211 212
  tw->fb = fstBuilderCreate(ctx, 0);
  if (tw->fb == NULL) {
    tfileWriterDestroy(tw);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
213 214
  return tw;
}
dengyihao's avatar
dengyihao 已提交
215

dengyihao's avatar
dengyihao 已提交
216
int tfileWriterPut(TFileWriter* tw, void* data) {
dengyihao's avatar
dengyihao 已提交
217
  // sort by coltype and write to tindex
dengyihao's avatar
dengyihao 已提交
218 219 220 221 222 223 224 225
  __compar_fn_t fn;

  int8_t colType = tw->header.colType;
  if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
    fn = tfileStrCompare;
  } else {
    fn = getComparFunc(colType, 0);
  }
dengyihao's avatar
dengyihao 已提交
226 227 228
  taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);

  int32_t bufLimit = 4096, offset = 0;
dengyihao's avatar
dengyihao 已提交
229
  char*   buf = calloc(1, sizeof(char) * bufLimit);
dengyihao's avatar
dengyihao 已提交
230
  char*   p = buf;
dengyihao's avatar
dengyihao 已提交
231
  int32_t sz = taosArrayGetSize((SArray*)data);
dengyihao's avatar
dengyihao 已提交
232 233 234 235 236 237 238
  int32_t fstOffset = tw->offset;

  // ugly code, refactor later
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
dengyihao's avatar
dengyihao 已提交
239
    fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
dengyihao's avatar
dengyihao 已提交
240 241 242
  }
  tfileWriteFstOffset(tw, fstOffset);

dengyihao's avatar
dengyihao 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
    // check buf has enough space or not
    int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
    if (offset + ttsz > bufLimit) {
      // batch write
      tw->ctx->write(tw->ctx, buf, offset);
      offset = 0;
      memset(buf, 0, bufLimit);
      p = buf;
    }

    tfileSerialTableIdsToBuf(p, v->tableId);
    offset += ttsz;
    p = buf + offset;
dengyihao's avatar
dengyihao 已提交
260
    // set up value offset
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265 266 267
    v->offset = tw->offset;
    tw->offset += ttsz;
  }
  if (offset != 0) {
    // write reversed data in buf to tindex
    tw->ctx->write(tw->ctx, buf, offset);
  }
dengyihao's avatar
dengyihao 已提交
268
  tfree(buf);
dengyihao's avatar
dengyihao 已提交
269

dengyihao's avatar
dengyihao 已提交
270 271 272 273 274 275 276 277
  // write fst
  for (size_t i = 0; i < sz; i++) {
    // TODO, fst batch write later
    TFileValue* v = taosArrayGetP((SArray*)data, i);
    if (tfileWriteData(tw, v) == 0) {
      //
    }
  }
dengyihao's avatar
dengyihao 已提交
278 279 280
  fstBuilderFinish(tw->fb);
  fstBuilderDestroy(tw->fb);
  tw->fb = NULL;
dengyihao's avatar
dengyihao 已提交
281 282
  return 0;
}
dengyihao's avatar
dengyihao 已提交
283
void tfileWriterDestroy(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
284 285 286 287 288
  if (tw == NULL) { return; }

  writerCtxDestroy(tw->ctx);
  free(tw);
}
dengyihao's avatar
dengyihao 已提交
289

dengyihao's avatar
dengyihao 已提交
290 291
IndexTFile* indexTFileCreate(const char* path) {
  IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
dengyihao's avatar
dengyihao 已提交
292
  if (tfile == NULL) { return NULL; }
293

dengyihao's avatar
dengyihao 已提交
294
  tfile->cache = tfileCacheCreate(path);
dengyihao's avatar
dengyihao 已提交
295 296
  return tfile;
}
dengyihao's avatar
dengyihao 已提交
297 298 299
void IndexTFileDestroy(IndexTFile* tfile) {
  free(tfile);
}
dengyihao's avatar
dengyihao 已提交
300

dengyihao's avatar
dengyihao 已提交
301
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
dengyihao's avatar
dengyihao 已提交
302 303
  int ret = -1;
  if (tfile == NULL) { return ret; }
dengyihao's avatar
dengyihao 已提交
304
  IndexTFile* pTfile = (IndexTFile*)tfile;
305

dengyihao's avatar
dengyihao 已提交
306
  SIndexTerm*   term = query->term;
dengyihao's avatar
dengyihao 已提交
307 308 309
  TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
  TFileReader*  reader = tfileCacheGet(pTfile->cache, &key);

dengyihao's avatar
dengyihao 已提交
310
  return tfileReaderSearch(reader, query, result);
dengyihao's avatar
dengyihao 已提交
311
}
dengyihao's avatar
dengyihao 已提交
312
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
313 314
  // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
  // 1};
dengyihao's avatar
dengyihao 已提交
315

316 317
  return 0;
}
dengyihao's avatar
dengyihao 已提交
318

dengyihao's avatar
dengyihao 已提交
319 320 321 322 323 324
static int tfileStrCompare(const void* a, const void* b) {
  int ret = strcmp((char*)a, (char*)b);
  if (ret == 0) { return ret; }
  return ret < 0 ? -1 : 1;
}

dengyihao's avatar
dengyihao 已提交
325 326 327 328 329 330 331 332
static int tfileValueCompare(const void* a, const void* b, const void* param) {
  __compar_fn_t fn = *(__compar_fn_t*)param;

  TFileValue* av = (TFileValue*)a;
  TFileValue* bv = (TFileValue*)b;

  return fn(av->colVal, bv->colVal);
}
dengyihao's avatar
dengyihao 已提交
333 334 335 336 337
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
  int sz = taosArrayGetSize(ids);
  SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
  for (size_t i = 0; i < sz; i++) {
    uint64_t* v = taosArrayGet(ids, i);
dengyihao's avatar
dengyihao 已提交
338 339 340 341 342 343 344 345 346 347 348
    SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
  }
}

static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
  int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
  tw->header.fstOffset = fstOffset;
  if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
  return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
dengyihao's avatar
dengyihao 已提交
349
  char buf[TFILE_HEADER_NO_FST] = {0};
dengyihao's avatar
dengyihao 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372

  TFileHeader* header = &writer->header;
  memcpy(buf, (char*)header, sizeof(buf));

  int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
  if (sizeof(buf) != nwrite) { return -1; }
  writer->offset = nwrite;
  return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
  TFileHeader* header = &write->header;
  uint8_t      colType = header->colType;
  if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
    FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
    if (fstBuilderInsert(write->fb, key, tval->offset)) {
      fstSliceDestroy(&key);
      return 0;
    }
    fstSliceDestroy(&key);
    return -1;
  } else {
    // handle other type later
  }
dengyihao's avatar
dengyihao 已提交
373
  return 0;
dengyihao's avatar
dengyihao 已提交
374 375 376
}
static int tfileReadLoadHeader(TFileReader* reader) {
  // TODO simple tfile header later
dengyihao's avatar
dengyihao 已提交
377
  char buf[TFILE_HEADER_SIZE] = {0};
dengyihao's avatar
dengyihao 已提交
378 379 380 381 382 383

  int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf));
  assert(nread == sizeof(buf));
  memcpy(&reader->header, buf, sizeof(buf));
  return 0;
}
dengyihao's avatar
dengyihao 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
static int tfileReadLoadFst(TFileReader* reader) {
  // current load fst into memory, refactor it later
  static int FST_MAX_SIZE = 16 * 1024;

  char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
  if (buf == NULL) { return -1; }

  WriterCtx* ctx = reader->ctx;
  int32_t    nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
  // we assuse fst size less than FST_MAX_SIZE
  assert(nread > 0 && nread < FST_MAX_SIZE);

  FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
  reader->fst = fstCreate(&st);
  free(buf);
  fstSliceDestroy(&st);

  return reader->fst == NULL ? 0 : -1;
}
dengyihao's avatar
dengyihao 已提交
403 404 405
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
  int32_t    nid;
  WriterCtx* ctx = reader->ctx;
dengyihao's avatar
dengyihao 已提交
406 407

  int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
dengyihao's avatar
dengyihao 已提交
408 409
  assert(sizeof(nid) == nread);

dengyihao's avatar
dengyihao 已提交
410 411
  int32_t total = sizeof(uint64_t) * nid;
  char*   buf = calloc(1, total);
dengyihao's avatar
dengyihao 已提交
412 413
  if (buf == NULL) { return -1; }

dengyihao's avatar
dengyihao 已提交
414 415 416
  nread = ctx->read(ctx, buf, total);
  assert(total == nread);

dengyihao's avatar
dengyihao 已提交
417
  for (int32_t i = 0; i < nid; i++) {
dengyihao's avatar
dengyihao 已提交
418
    taosArrayPush(result, (uint64_t*)buf + i);
dengyihao's avatar
dengyihao 已提交
419 420 421 422
  }
  free(buf);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
423 424 425 426 427 428 429 430 431
static void tfileReadRef(TFileReader* reader) {
  int ref = T_REF_INC(reader);
  UNUSED(ref);
}

static void tfileReadUnRef(TFileReader* reader) {
  int ref = T_REF_DEC(reader);
  if (ref == 0) { tfileReaderDestroy(reader); }
}
dengyihao's avatar
dengyihao 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446

static int tfileGetFileList(const char* path, SArray* result) {
  DIR* dir = opendir(path);
  if (NULL == dir) { return -1; }

  struct dirent* entry;
  while ((entry = readdir(dir)) != NULL) {
    size_t len = strlen(entry->d_name);
    char*  buf = calloc(1, len + 1);
    memcpy(buf, entry->d_name, len);
    taosArrayPush(result, &buf);
  }
  closedir(dir);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
447 448 449 450
static int tfileRmExpireFile(SArray* result) {
  // TODO(yihao): remove expire tindex after restart
  return 0;
}
dengyihao's avatar
dengyihao 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464
static void tfileDestroyFileName(void* elem) {
  char* p = *(char**)elem;
  free(p);
}
static int tfileCompare(const void* a, const void* b) {
  const char* aName = *(char**)a;
  const char* bName = *(char**)b;

  size_t aLen = strlen(aName);
  size_t bLen = strlen(bName);

  return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
465 466 467 468
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) {
  sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version);
  return;
}
dengyihao's avatar
dengyihao 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
    // read suid & colid & version  success
    return 0;
  }
  return -1;
}
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
  SERIALIZE_MEM_TO_BUF(buf, key, suid);
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
  SERIALIZE_MEM_TO_BUF(buf, key, colType);
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
  SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}