index_tfile.c 20.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
//#include <sys/types.h>
//#include <dirent.h>
dengyihao's avatar
dengyihao 已提交
18
#include "index_tfile.h"
19
#include "index.h"
dengyihao's avatar
dengyihao 已提交
20
#include "index_fst.h"
21
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
22
#include "index_util.h"
dengyihao's avatar
dengyihao 已提交
23
#include "taosdef.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32
typedef struct TFileFstIter {
  FstStreamBuilder* fb;
  StreamWithState*  st;
  AutomationCtx*    ctx;
  TFileReader*      rdr;
} TFileFstIter;

dengyihao's avatar
dengyihao 已提交
33 34
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))

dengyihao's avatar
dengyihao 已提交
35
static int  tfileUidCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
36
static int  tfileStrCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
37 38
static int  tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
dengyihao's avatar
dengyihao 已提交
39

dengyihao's avatar
dengyihao 已提交
40
static int tfileWriteHeader(TFileWriter* writer);
dengyihao's avatar
dengyihao 已提交
41
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
dengyihao's avatar
dengyihao 已提交
42
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
dengyihao's avatar
dengyihao 已提交
43

dengyihao's avatar
dengyihao 已提交
44 45 46
static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
dengyihao's avatar
dengyihao 已提交
47

dengyihao's avatar
dengyihao 已提交
48
static int  tfileGetFileList(const char* path, SArray* result);
dengyihao's avatar
dengyihao 已提交
49
static int  tfileRmExpireFile(SArray* result);
dengyihao's avatar
dengyihao 已提交
50 51 52
static void tfileDestroyFileName(void* elem);
static int  tfileCompare(const void* a, const void* b);
static int  tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
dengyihao's avatar
dengyihao 已提交
53
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
dengyihao's avatar
dengyihao 已提交
54
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
55

dengyihao's avatar
dengyihao 已提交
56 57
TFileCache* tfileCacheCreate(const char* path) {
  TFileCache* tcache = calloc(1, sizeof(TFileCache));
dengyihao's avatar
dengyihao 已提交
58
  if (tcache == NULL) { return NULL; }
59 60 61 62

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

dengyihao's avatar
dengyihao 已提交
63
  SArray* files = taosArrayInit(4, sizeof(void*));
64
  tfileGetFileList(path, files);
dengyihao's avatar
dengyihao 已提交
65
  taosArraySort(files, tfileCompare);
dengyihao's avatar
dengyihao 已提交
66 67
  tfileRmExpireFile(files);

dengyihao's avatar
dengyihao 已提交
68 69
  uint64_t suid;
  int32_t  colId, version;
dengyihao's avatar
dengyihao 已提交
70
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
dengyihao's avatar
dengyihao 已提交
71 72
    char* file = taosArrayGetP(files, i);
    if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) {
dengyihao's avatar
dengyihao 已提交
73
      indexInfo("try parse invalid file:  %s, skip it", file);
74 75
      continue;
    }
dengyihao's avatar
dengyihao 已提交
76

dengyihao's avatar
dengyihao 已提交
77
    WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
78
    if (wc == NULL) {
dengyihao's avatar
dengyihao 已提交
79
      indexError("failed to open index:%s", file);
80
      goto End;
dengyihao's avatar
dengyihao 已提交
81
    }
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83
    char          buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
84
    TFileReader*  reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
85
    TFileHeader*  header = &reader->header;
86 87 88 89
    TFileCacheKey key = {.suid = header->suid,
                         .colName = header->colName,
                         .nColName = strlen(header->colName),
                         .colType = header->colType};
dengyihao's avatar
dengyihao 已提交
90
    tfileSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
91 92 93

    tfileReaderRef(reader);
    // indexTable
dengyihao's avatar
dengyihao 已提交
94
    taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
95
  }
dengyihao's avatar
dengyihao 已提交
96
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
97
  return tcache;
dengyihao's avatar
dengyihao 已提交
98
End:
99
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
100
  taosArrayDestroyEx(files, tfileDestroyFileName);
101
  return NULL;
dengyihao's avatar
dengyihao 已提交
102
}
dengyihao's avatar
dengyihao 已提交
103
void tfileCacheDestroy(TFileCache* tcache) {
dengyihao's avatar
dengyihao 已提交
104
  if (tcache == NULL) { return; }
105 106

  // free table cache
dengyihao's avatar
dengyihao 已提交
107
  TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
108
  while (reader) {
dengyihao's avatar
dengyihao 已提交
109
    TFileReader* p = *reader;
110 111
    indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
              p->header.colType);
dengyihao's avatar
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
    tfileReaderUnRef(p);
114 115 116 117
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
  free(tcache);
dengyihao's avatar
dengyihao 已提交
118 119
}

dengyihao's avatar
dengyihao 已提交
120
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
121
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
122
  tfileSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
123

124
  TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
dengyihao's avatar
dengyihao 已提交
125
  if (reader == NULL) { return NULL; }
126
  tfileReaderRef(*reader);
dengyihao's avatar
dengyihao 已提交
127

128
  return *reader;
dengyihao's avatar
dengyihao 已提交
129
}
dengyihao's avatar
dengyihao 已提交
130
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
131 132
  char buf[128] = {0};
  tfileSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
133 134
  // remove last version index reader
  TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
dengyihao's avatar
dengyihao 已提交
135
  if (p != NULL) {
dengyihao's avatar
dengyihao 已提交
136 137
    TFileReader* oldReader = *p;
    taosHashRemove(tcache->tableCache, buf, strlen(buf));
dengyihao's avatar
dengyihao 已提交
138
    oldReader->remove = true;
dengyihao's avatar
dengyihao 已提交
139
    tfileReaderUnRef(oldReader);
dengyihao's avatar
dengyihao 已提交
140
  }
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142
  tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
143
  taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
144
  return;
145
}
dengyihao's avatar
dengyihao 已提交
146 147
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
  TFileReader* reader = calloc(1, sizeof(TFileReader));
dengyihao's avatar
dengyihao 已提交
148
  if (reader == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
149

150
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
151
  reader->ctx = ctx;
dengyihao's avatar
dengyihao 已提交
152 153
  if (0 != tfileReaderLoadHeader(reader)) {
    tfileReaderDestroy(reader);
154 155
    indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
               reader->header.colName);
dengyihao's avatar
dengyihao 已提交
156 157 158 159 160 161 162 163 164
    return NULL;
  }

  if (0 != tfileReaderLoadFst(reader)) {
    tfileReaderDestroy(reader);
    indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
    return NULL;
  }

165
  return reader;
dengyihao's avatar
dengyihao 已提交
166
}
dengyihao's avatar
dengyihao 已提交
167
void tfileReaderDestroy(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
168
  if (reader == NULL) { return; }
169
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
170
  fstDestroy(reader->fst);
dengyihao's avatar
dengyihao 已提交
171
  writerCtxDestroy(reader->ctx, reader->remove);
dengyihao's avatar
dengyihao 已提交
172 173 174
  free(reader);
}

dengyihao's avatar
dengyihao 已提交
175
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
dengyihao's avatar
dengyihao 已提交
176 177
  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
178

dengyihao's avatar
dengyihao 已提交
179
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
180
  // refactor to callback later
dengyihao's avatar
dengyihao 已提交
181
  if (qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
182 183 184
    uint64_t offset;
    FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
    if (fstGet(reader->fst, &key, &offset)) {
185 186
      indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
                term->colVal);
dengyihao's avatar
dengyihao 已提交
187
      ret = tfileReaderLoadTableIds(reader, offset, result);
dengyihao's avatar
dengyihao 已提交
188
    } else {
189 190
      indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
                term->colVal);
dengyihao's avatar
dengyihao 已提交
191
    }
dengyihao's avatar
dengyihao 已提交
192 193
    fstSliceDestroy(&key);
  } else if (qtype == QUERY_PREFIX) {
dengyihao's avatar
dengyihao 已提交
194
    // handle later
dengyihao's avatar
dengyihao 已提交
195
    //
dengyihao's avatar
dengyihao 已提交
196 197
  } else {
    // handle later
dengyihao's avatar
dengyihao 已提交
198
  }
dengyihao's avatar
dengyihao 已提交
199
  tfileReaderUnRef(reader);
dengyihao's avatar
dengyihao 已提交
200
  return ret;
dengyihao's avatar
dengyihao 已提交
201 202
}

dengyihao's avatar
dengyihao 已提交
203 204 205 206 207 208 209
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
  char    filename[128] = {0};
  int32_t coldId = 1;
  tfileGenFileName(filename, suid, coldId, version);

  char fullname[256] = {0};
  snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
dengyihao's avatar
dengyihao 已提交
210
  WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
dengyihao's avatar
dengyihao 已提交
211
  if (wcx == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
212 213 214 215 216 217 218 219 220

  TFileHeader tfh = {0};
  tfh.suid = suid;
  tfh.version = version;
  memcpy(tfh.colName, colName, strlen(colName));
  tfh.colType = colType;

  return tfileWriterCreate(wcx, &tfh);
}
dengyihao's avatar
dengyihao 已提交
221 222 223 224 225 226 227
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
  char    filename[128] = {0};
  int32_t coldId = 1;
  tfileGenFileName(filename, suid, coldId, version);

  char fullname[256] = {0};
  snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
dengyihao's avatar
dengyihao 已提交
228
  WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
dengyihao's avatar
dengyihao 已提交
229 230
  if (wc == NULL) { return NULL; }

dengyihao's avatar
dengyihao 已提交
231
  TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
232 233 234 235
  return reader;

  // tfileSerialCacheKey(&key, buf);
}
dengyihao's avatar
dengyihao 已提交
236
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
dengyihao's avatar
dengyihao 已提交
237 238 239 240 241
  // char pathBuf[128] = {0};
  // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
  // TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
  // memcpy(header.colName, );

dengyihao's avatar
dengyihao 已提交
242 243 244 245 246 247 248
  // char buf[TFILE_HADER_PRE_SIZE];
  // int  len = TFILE_HADER_PRE_SIZE;
  // if (len != ctx->write(ctx, buf, len)) {
  //  indexError("index: %" PRIu64 " failed to write header info", header->suid);
  //  return NULL;
  //}
  TFileWriter* tw = calloc(1, sizeof(TFileWriter));
dengyihao's avatar
dengyihao 已提交
249
  if (tw == NULL) {
dengyihao's avatar
dengyihao 已提交
250
    indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
dengyihao's avatar
dengyihao 已提交
251 252
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
253 254
  tw->ctx = ctx;
  tw->header = *header;
dengyihao's avatar
dengyihao 已提交
255
  tfileWriteHeader(tw);
dengyihao's avatar
dengyihao 已提交
256 257
  return tw;
}
dengyihao's avatar
dengyihao 已提交
258

dengyihao's avatar
dengyihao 已提交
259
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
dengyihao's avatar
dengyihao 已提交
260
  // sort by coltype and write to tindex
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265 266 267 268 269
  if (order == false) {
    __compar_fn_t fn;
    int8_t        colType = tw->header.colType;
    if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
      fn = tfileStrCompare;
    } else {
      fn = getComparFunc(colType, 0);
    }
    taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
dengyihao's avatar
dengyihao 已提交
270
  }
dengyihao's avatar
dengyihao 已提交
271

dengyihao's avatar
dengyihao 已提交
272 273 274
  int32_t bufLimit = 64 * 4096, offset = 0;
  // char*   buf = calloc(1, sizeof(char) * bufLimit);
  // char*   p = buf;
dengyihao's avatar
dengyihao 已提交
275
  int32_t sz = taosArrayGetSize((SArray*)data);
dengyihao's avatar
dengyihao 已提交
276 277 278 279 280
  int32_t fstOffset = tw->offset;

  // ugly code, refactor later
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
281
    // taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
282
    int32_t tbsz = taosArrayGetSize(v->tableId);
dengyihao's avatar
dengyihao 已提交
283
    fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
dengyihao's avatar
dengyihao 已提交
284 285 286
  }
  tfileWriteFstOffset(tw, fstOffset);

dengyihao's avatar
dengyihao 已提交
287 288 289 290 291 292 293
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
    // check buf has enough space or not
    int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);

dengyihao's avatar
dengyihao 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307
    // if (offset + ttsz >= bufLimit) {
    //  // batch write
    //  indexInfo("offset: %d, ttsz: %d", offset, ttsz);
    //  // std::cout << "offset: " << offset << std::endl;
    //  // std::cout << "ttsz:" << ttsz < < < std::endl;
    //  tw->ctx->write(tw->ctx, buf, offset);
    //  offset = 0;
    //  memset(buf, 0, bufLimit);
    //  p = buf;
    //}
    // if (ttsz >= bufLimit) {
    //}
    char* buf = calloc(1, ttsz * sizeof(char));
    char* p = buf;
dengyihao's avatar
dengyihao 已提交
308
    tfileSerialTableIdsToBuf(p, v->tableId);
dengyihao's avatar
dengyihao 已提交
309 310 311
    tw->ctx->write(tw->ctx, buf, ttsz);
    // offset += ttsz;
    // p = buf + offset;
dengyihao's avatar
dengyihao 已提交
312
    // set up value offset
dengyihao's avatar
dengyihao 已提交
313 314
    v->offset = tw->offset;
    tw->offset += ttsz;
dengyihao's avatar
dengyihao 已提交
315
    free(buf);
dengyihao's avatar
dengyihao 已提交
316
  }
dengyihao's avatar
dengyihao 已提交
317 318 319 320 321
  // if (offset != 0) {
  // write reversed data in buf to tindex
  // tw->ctx->write(tw->ctx, buf, offset);
  //}
  // tfree(buf);
dengyihao's avatar
dengyihao 已提交
322

dengyihao's avatar
dengyihao 已提交
323 324
  tw->fb = fstBuilderCreate(tw->ctx, 0);
  if (tw->fb == NULL) {
dengyihao's avatar
dengyihao 已提交
325
    tfileWriterClose(tw);
dengyihao's avatar
dengyihao 已提交
326 327
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
328
  // write fst
dengyihao's avatar
dengyihao 已提交
329
  indexError("--------Begin----------------");
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335
  for (size_t i = 0; i < sz; i++) {
    // TODO, fst batch write later
    TFileValue* v = taosArrayGetP((SArray*)data, i);
    if (tfileWriteData(tw, v) == 0) {
      //
    }
dengyihao's avatar
dengyihao 已提交
336
    indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
dengyihao's avatar
dengyihao 已提交
337
  }
dengyihao's avatar
dengyihao 已提交
338
  indexError("--------End----------------");
dengyihao's avatar
dengyihao 已提交
339 340 341
  fstBuilderFinish(tw->fb);
  fstBuilderDestroy(tw->fb);
  tw->fb = NULL;
dengyihao's avatar
dengyihao 已提交
342 343
  return 0;
}
dengyihao's avatar
dengyihao 已提交
344 345 346
void tfileWriterClose(TFileWriter* tw) {
  if (tw == NULL) { return; }
  writerCtxDestroy(tw->ctx, false);
dengyihao's avatar
dengyihao 已提交
347 348
  free(tw);
}
dengyihao's avatar
dengyihao 已提交
349
void tfileWriterDestroy(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
350 351
  if (tw == NULL) { return; }
  writerCtxDestroy(tw->ctx, false);
dengyihao's avatar
dengyihao 已提交
352 353
  free(tw);
}
dengyihao's avatar
dengyihao 已提交
354

dengyihao's avatar
dengyihao 已提交
355 356
IndexTFile* indexTFileCreate(const char* path) {
  IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
dengyihao's avatar
dengyihao 已提交
357
  if (tfile == NULL) { return NULL; }
358

dengyihao's avatar
dengyihao 已提交
359
  tfile->cache = tfileCacheCreate(path);
dengyihao's avatar
dengyihao 已提交
360 361
  return tfile;
}
dengyihao's avatar
dengyihao 已提交
362 363 364 365
void IndexTFileDestroy(IndexTFile* tfile) {
  tfileCacheDestroy(tfile->cache);
  free(tfile);
}
dengyihao's avatar
dengyihao 已提交
366

dengyihao's avatar
dengyihao 已提交
367
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
dengyihao's avatar
dengyihao 已提交
368
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
369
  if (tfile == NULL) { return ret; }
dengyihao's avatar
dengyihao 已提交
370
  IndexTFile* pTfile = (IndexTFile*)tfile;
371

dengyihao's avatar
dengyihao 已提交
372
  SIndexTerm*   term = query->term;
373 374 375
  TFileCacheKey key = {
      .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
  TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
dengyihao's avatar
dengyihao 已提交
376
  if (reader == NULL) { return 0; }
dengyihao's avatar
dengyihao 已提交
377

dengyihao's avatar
dengyihao 已提交
378
  return tfileReaderSearch(reader, query, result);
dengyihao's avatar
dengyihao 已提交
379
}
dengyihao's avatar
dengyihao 已提交
380
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
381 382
  // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
  // term->nColName, .version = 1};
dengyihao's avatar
dengyihao 已提交
383

384 385
  return 0;
}
dengyihao's avatar
dengyihao 已提交
386 387 388 389 390 391 392 393 394 395
static bool tfileIteratorNext(Iterate* iiter) {
  IterateValue* iv = &iiter->val;
  iterateValueDestroy(iv, false);
  // SArray* tblIds = iv->val;

  char*    colVal = NULL;
  uint64_t offset = 0;

  TFileFstIter*          tIter = iiter->iter;
  StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
dengyihao's avatar
dengyihao 已提交
396
  if (rt == NULL) { return false; }
dengyihao's avatar
dengyihao 已提交
397 398 399 400 401 402 403 404 405

  int32_t sz = 0;
  char*   ch = (char*)fstSliceData(&rt->data, &sz);
  colVal = calloc(1, sz + 1);
  memcpy(colVal, ch, sz);

  offset = (uint64_t)(rt->out.out);
  swsResultDestroy(rt);
  // set up iterate value
dengyihao's avatar
dengyihao 已提交
406
  if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
dengyihao's avatar
dengyihao 已提交
407 408 409 410 411 412

  iv->colVal = colVal;

  // std::string key(ch, sz);
}

413
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
dengyihao's avatar
dengyihao 已提交
414 415 416

static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
  TFileFstIter* tIter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
417 418
  if (tIter == NULL) { return NULL; }

dengyihao's avatar
dengyihao 已提交
419 420 421 422 423 424 425 426
  tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
  tIter->fb = fstSearch(reader->fst, tIter->ctx);
  tIter->st = streamBuilderIntoStream(tIter->fb);
  tIter->rdr = reader;
  return tIter;
}

Iterate* tfileIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
427
  if (reader == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
428

dengyihao's avatar
dengyihao 已提交
429
  Iterate* iter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
430
  iter->iter = tfileFstIteratorCreate(reader);
431
  if (iter->iter == NULL) {
dengyihao's avatar
dengyihao 已提交
432
    free(iter);
433 434
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
435 436
  iter->next = tfileIteratorNext;
  iter->getValue = tifileIterateGetValue;
dengyihao's avatar
dengyihao 已提交
437
  iter->val.val = taosArrayInit(1, sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
438 439 440
  return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
441 442
  if (iter == NULL) { return; }

dengyihao's avatar
dengyihao 已提交
443 444 445 446 447 448 449 450 451 452 453
  IterateValue* iv = &iter->val;
  iterateValueDestroy(iv, true);

  TFileFstIter* tIter = iter->iter;
  streamWithStateDestroy(tIter->st);
  fstStreamBuilderDestroy(tIter->fb);
  automCtxDestroy(tIter->ctx);

  free(iter);
}

dengyihao's avatar
dengyihao 已提交
454
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
dengyihao's avatar
dengyihao 已提交
455
  if (tf == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
456 457 458
  TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
  return tfileCacheGet(tf->cache, &key);
}
dengyihao's avatar
dengyihao 已提交
459

dengyihao's avatar
dengyihao 已提交
460 461 462 463 464
static int tfileUidCompare(const void* a, const void* b) {
  uint64_t l = *(uint64_t*)a;
  uint64_t r = *(uint64_t*)b;
  return l - r;
}
dengyihao's avatar
dengyihao 已提交
465 466
static int tfileStrCompare(const void* a, const void* b) {
  int ret = strcmp((char*)a, (char*)b);
dengyihao's avatar
dengyihao 已提交
467
  if (ret == 0) { return ret; }
dengyihao's avatar
dengyihao 已提交
468 469 470
  return ret < 0 ? -1 : 1;
}

dengyihao's avatar
dengyihao 已提交
471 472 473 474 475 476 477 478
static int tfileValueCompare(const void* a, const void* b, const void* param) {
  __compar_fn_t fn = *(__compar_fn_t*)param;

  TFileValue* av = (TFileValue*)a;
  TFileValue* bv = (TFileValue*)b;

  return fn(av->colVal, bv->colVal);
}
dengyihao's avatar
dengyihao 已提交
479 480 481

TFileValue* tfileValueCreate(char* val) {
  TFileValue* tf = calloc(1, sizeof(TFileValue));
dengyihao's avatar
dengyihao 已提交
482 483
  if (tf == NULL) { return NULL; }
  tf->colVal = val;
dengyihao's avatar
dengyihao 已提交
484 485 486 487
  tf->tableId = taosArrayInit(32, sizeof(uint64_t));
  return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
dengyihao's avatar
dengyihao 已提交
488
  if (tf == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
489 490 491 492 493 494 495
  taosArrayPush(tf->tableId, &val);
  return 0;
}
void tfileValueDestroy(TFileValue* tf) {
  taosArrayDestroy(tf->tableId);
  free(tf);
}
dengyihao's avatar
dengyihao 已提交
496 497 498 499 500
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
  int sz = taosArrayGetSize(ids);
  SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
  for (size_t i = 0; i < sz; i++) {
    uint64_t* v = taosArrayGet(ids, i);
dengyihao's avatar
dengyihao 已提交
501 502 503 504 505 506 507
    SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
  }
}

static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
  int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
  tw->header.fstOffset = fstOffset;
dengyihao's avatar
dengyihao 已提交
508
  if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
dengyihao's avatar
dengyihao 已提交
509
  tw->offset += sizeof(fstOffset);
dengyihao's avatar
dengyihao 已提交
510 511 512
  return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
dengyihao's avatar
dengyihao 已提交
513
  char buf[TFILE_HEADER_NO_FST] = {0};
dengyihao's avatar
dengyihao 已提交
514 515 516 517 518

  TFileHeader* header = &writer->header;
  memcpy(buf, (char*)header, sizeof(buf));

  int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
519
  if (sizeof(buf) != nwrite) { return -1; }
dengyihao's avatar
dengyihao 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
  writer->offset = nwrite;
  return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
  TFileHeader* header = &write->header;
  uint8_t      colType = header->colType;
  if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
    FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
    if (fstBuilderInsert(write->fb, key, tval->offset)) {
      fstSliceDestroy(&key);
      return 0;
    }
    fstSliceDestroy(&key);
    return -1;
  } else {
    // handle other type later
  }
dengyihao's avatar
dengyihao 已提交
537
  return 0;
dengyihao's avatar
dengyihao 已提交
538
}
dengyihao's avatar
dengyihao 已提交
539
static int tfileReaderLoadHeader(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
540
  // TODO simple tfile header later
dengyihao's avatar
dengyihao 已提交
541
  char buf[TFILE_HEADER_SIZE] = {0};
dengyihao's avatar
dengyihao 已提交
542

dengyihao's avatar
dengyihao 已提交
543
  int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
dengyihao's avatar
dengyihao 已提交
544 545 546 547 548 549
  if (nread == -1) {
    //
    indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
               errno, reader->ctx->file.fd, reader->ctx->file.buf);
  }
  // assert(nread == sizeof(buf));
dengyihao's avatar
dengyihao 已提交
550
  memcpy(&reader->header, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
551

dengyihao's avatar
dengyihao 已提交
552 553
  return 0;
}
dengyihao's avatar
dengyihao 已提交
554
static int tfileReaderLoadFst(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
555
  // current load fst into memory, refactor it later
dengyihao's avatar
dengyihao 已提交
556
  static int FST_MAX_SIZE = 64 * 1024;
dengyihao's avatar
dengyihao 已提交
557 558

  char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
dengyihao's avatar
dengyihao 已提交
559
  if (buf == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
560 561 562 563 564 565 566 567 568 569 570

  WriterCtx* ctx = reader->ctx;
  int32_t    nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
  // we assuse fst size less than FST_MAX_SIZE
  assert(nread > 0 && nread < FST_MAX_SIZE);

  FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
  reader->fst = fstCreate(&st);
  free(buf);
  fstSliceDestroy(&st);

dengyihao's avatar
dengyihao 已提交
571
  return reader->fst != NULL ? 0 : -1;
dengyihao's avatar
dengyihao 已提交
572
}
dengyihao's avatar
dengyihao 已提交
573
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
dengyihao's avatar
dengyihao 已提交
574 575
  int32_t    nid;
  WriterCtx* ctx = reader->ctx;
dengyihao's avatar
dengyihao 已提交
576 577

  int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
dengyihao's avatar
dengyihao 已提交
578 579
  assert(sizeof(nid) == nread);

dengyihao's avatar
dengyihao 已提交
580 581
  int32_t total = sizeof(uint64_t) * nid;
  char*   buf = calloc(1, total);
dengyihao's avatar
dengyihao 已提交
582
  if (buf == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
583

dengyihao's avatar
dengyihao 已提交
584
  nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
dengyihao's avatar
dengyihao 已提交
585 586
  assert(total == nread);

dengyihao's avatar
dengyihao 已提交
587
  for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
dengyihao's avatar
dengyihao 已提交
588 589 590
  free(buf);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
591
void tfileReaderRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
592
  if (reader == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
593 594 595 596
  int ref = T_REF_INC(reader);
  UNUSED(ref);
}

dengyihao's avatar
dengyihao 已提交
597
void tfileReaderUnRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
598
  if (reader == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
599
  int ref = T_REF_DEC(reader);
600
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
601
    // do nothing
602 603
    tfileReaderDestroy(reader);
  }
dengyihao's avatar
dengyihao 已提交
604
}
dengyihao's avatar
dengyihao 已提交
605 606 607

static int tfileGetFileList(const char* path, SArray* result) {
  DIR* dir = opendir(path);
dengyihao's avatar
dengyihao 已提交
608
  if (NULL == dir) { return -1; }
dengyihao's avatar
dengyihao 已提交
609 610 611 612 613 614 615 616 617 618 619

  struct dirent* entry;
  while ((entry = readdir(dir)) != NULL) {
    size_t len = strlen(entry->d_name);
    char*  buf = calloc(1, len + 1);
    memcpy(buf, entry->d_name, len);
    taosArrayPush(result, &buf);
  }
  closedir(dir);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
620 621 622 623
static int tfileRmExpireFile(SArray* result) {
  // TODO(yihao): remove expire tindex after restart
  return 0;
}
dengyihao's avatar
dengyihao 已提交
624 625 626 627 628 629 630 631 632 633 634
static void tfileDestroyFileName(void* elem) {
  char* p = *(char**)elem;
  free(p);
}
static int tfileCompare(const void* a, const void* b) {
  const char* aName = *(char**)a;
  const char* bName = *(char**)b;

  size_t aLen = strlen(aName);
  size_t bLen = strlen(bName);

635
  int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
dengyihao's avatar
dengyihao 已提交
636
  if (ret == 0) { return ret; }
637
  return ret < 0 ? -1 : 1;
dengyihao's avatar
dengyihao 已提交
638 639
}
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
640 641 642 643
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) {
  sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version);
  return;
}
dengyihao's avatar
dengyihao 已提交
644 645 646 647 648 649 650 651
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
    // read suid & colid & version  success
    return 0;
  }
  return -1;
}
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
652 653 654 655
  // SERIALIZE_MEM_TO_BUF(buf, key, suid);
  // SERIALIZE_VAR_TO_BUF(buf, '_', char);
  // SERIALIZE_MEM_TO_BUF(buf, key, colType);
  // SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
656 657
  SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}