indexTfile.c 34.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
15
#include "indexTfile.h"
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18 19 20
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
21
#include "taosdef.h"
22
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tcoding.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27
const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull;

dengyihao's avatar
dengyihao 已提交
28 29 30 31 32 33 34
typedef struct TFileFstIter {
  FstStreamBuilder* fb;
  StreamWithState*  st;
  AutomationCtx*    ctx;
  TFileReader*      rdr;
} TFileFstIter;

dengyihao's avatar
dengyihao 已提交
35 36
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))

dengyihao's avatar
dengyihao 已提交
37
static int  tfileUidCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
38
static int  tfileStrCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
39 40
static int  tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
static int tfileWriteHeader(TFileWriter* writer);
dengyihao's avatar
dengyihao 已提交
43
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
dengyihao's avatar
dengyihao 已提交
44
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
dengyihao's avatar
dengyihao 已提交
45
static int tfileWriteFooter(TFileWriter* write);
dengyihao's avatar
dengyihao 已提交
46

dengyihao's avatar
dengyihao 已提交
47
// handle file corrupt later
dengyihao's avatar
dengyihao 已提交
48 49
static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
50
static int tfileReaderVerify(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
51
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53 54 55 56
static SArray* tfileGetFileList(const char* path);
static int     tfileRmExpireFile(SArray* result);
static void    tfileDestroyFileName(void* elem);
static int     tfileCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
57 58 59
static int     tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version);
static void    tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version);
static void    tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version);
dengyihao's avatar
dengyihao 已提交
60 61 62
/*
 * search from  tfile
 */
dengyihao's avatar
dengyihao 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr);

static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);

static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
dengyihao's avatar
dengyihao 已提交
76
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81 82 83 84 85 86 87 88
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);

static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);

static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = {
89 90
    {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
     tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
dengyihao's avatar
dengyihao 已提交
91
    {tfSearchEqual_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
92
     tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
TFileCache* tfileCacheCreate(const char* path) {
wafwerar's avatar
wafwerar 已提交
95
  TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
dengyihao's avatar
dengyihao 已提交
96 97 98
  if (tcache == NULL) {
    return NULL;
  }
99 100 101 102

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

dengyihao's avatar
dengyihao 已提交
103
  SArray* files = tfileGetFileList(path);
dengyihao's avatar
dengyihao 已提交
104
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
dengyihao's avatar
dengyihao 已提交
105
    char* file = taosArrayGetP(files, i);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107
    WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
108
    if (wc == NULL) {
dengyihao's avatar
dengyihao 已提交
109
      indexError("failed to open index:%s", file);
110
      goto End;
dengyihao's avatar
dengyihao 已提交
111
    }
dengyihao's avatar
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
    TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
114 115 116 117
    if (reader == NULL) {
      indexInfo("skip invalid file: %s", file);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
118
    TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
119
    ICacheKey    key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121
    char    buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
122 123 124
    int32_t sz = indexSerialCacheKey(&key, buf);
    assert(sz < sizeof(buf));
    taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
125
    tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
126
  }
dengyihao's avatar
dengyihao 已提交
127
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
128
  return tcache;
dengyihao's avatar
dengyihao 已提交
129
End:
130
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
131
  taosArrayDestroyEx(files, tfileDestroyFileName);
132
  return NULL;
dengyihao's avatar
dengyihao 已提交
133
}
dengyihao's avatar
dengyihao 已提交
134
void tfileCacheDestroy(TFileCache* tcache) {
dengyihao's avatar
dengyihao 已提交
135 136 137
  if (tcache == NULL) {
    return;
  }
138
  // free table cache
dengyihao's avatar
dengyihao 已提交
139
  TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
140
  while (reader) {
dengyihao's avatar
dengyihao 已提交
141
    TFileReader* p = *reader;
142 143
    indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
              p->header.colType);
dengyihao's avatar
dengyihao 已提交
144
    tfileReaderUnRef(p);
145 146 147
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
wafwerar's avatar
wafwerar 已提交
148
  taosMemoryFree(tcache);
dengyihao's avatar
dengyihao 已提交
149 150
}

dengyihao's avatar
dengyihao 已提交
151 152 153 154 155
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
  char    buf[128] = {0};
  int32_t sz = indexSerialCacheKey(key, buf);
  assert(sz < sizeof(buf));
  TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
156
  if (reader == NULL || *reader == NULL) {
dengyihao's avatar
dengyihao 已提交
157 158
    return NULL;
  }
159
  tfileReaderRef(*reader);
dengyihao's avatar
dengyihao 已提交
160

161
  return *reader;
dengyihao's avatar
dengyihao 已提交
162
}
dengyihao's avatar
dengyihao 已提交
163 164 165
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
  char    buf[128] = {0};
  int32_t sz = indexSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
166
  // remove last version index reader
dengyihao's avatar
dengyihao 已提交
167
  TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
168
  if (p != NULL && *p != NULL) {
dengyihao's avatar
dengyihao 已提交
169
    TFileReader* oldRdr = *p;
dengyihao's avatar
dengyihao 已提交
170
    taosHashRemove(tcache->tableCache, buf, sz);
dengyihao's avatar
dengyihao 已提交
171 172 173
    indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
    oldRdr->remove = true;
    tfileReaderUnRef(oldRdr);
dengyihao's avatar
dengyihao 已提交
174
  }
dengyihao's avatar
dengyihao 已提交
175
  taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
176
  tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
177
  return;
178
}
dengyihao's avatar
dengyihao 已提交
179
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
wafwerar's avatar
wafwerar 已提交
180
  TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
dengyihao's avatar
dengyihao 已提交
181 182 183
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
184
  reader->ctx = ctx;
dengyihao's avatar
dengyihao 已提交
185 186

  if (0 != tfileReaderVerify(reader)) {
dengyihao's avatar
dengyihao 已提交
187
    indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
dengyihao's avatar
dengyihao 已提交
188
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
189 190 191
    return NULL;
  }
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
192
  if (0 != tfileReaderLoadHeader(reader)) {
193 194
    indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
               reader->header.colName);
dengyihao's avatar
dengyihao 已提交
195
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
196 197 198 199
    return NULL;
  }

  if (0 != tfileReaderLoadFst(reader)) {
dengyihao's avatar
dengyihao 已提交
200 201
    indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid,
               reader->header.colName, errno);
dengyihao's avatar
dengyihao 已提交
202 203 204
    tfileReaderDestroy(reader);
    return NULL;
  }
205
  reader->remove = false;
dengyihao's avatar
dengyihao 已提交
206

207
  return reader;
dengyihao's avatar
dengyihao 已提交
208
}
dengyihao's avatar
dengyihao 已提交
209
void tfileReaderDestroy(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
210 211 212
  if (reader == NULL) {
    return;
  }
213
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
214
  fstDestroy(reader->fst);
dengyihao's avatar
dengyihao 已提交
215 216 217 218 219
  if (reader->remove) {
    indexInfo("%s is removed", reader->ctx->file.buf);
  } else {
    indexInfo("%s is not removed", reader->ctx->file.buf);
  }
dengyihao's avatar
dengyihao 已提交
220
  writerCtxDestroy(reader->ctx, reader->remove);
dengyihao's avatar
dengyihao 已提交
221

wafwerar's avatar
wafwerar 已提交
222
  taosMemoryFree(reader);
dengyihao's avatar
dengyihao 已提交
223
}
dengyihao's avatar
dengyihao 已提交
224
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
225 226 227
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
228

dengyihao's avatar
dengyihao 已提交
229 230 231 232 233 234 235 236
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);
dengyihao's avatar
dengyihao 已提交
237

dengyihao's avatar
dengyihao 已提交
238
    ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
dengyihao's avatar
dengyihao 已提交
239 240 241 242 243 244 245
    cost = taosGetTimestampUs() - et;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
              tem->colName, tem->colVal, cost);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
246

dengyihao's avatar
dengyihao 已提交
247
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  bool     hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

  AutomationCtx*         ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FstStreamBuilder*      sb = fstSearch(((TFileReader*)reader)->fst, ctx);
  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
    taosArrayPush(offsets, &(rt->out.out));
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);

  int32_t ret = 0;
  for (int i = 0; i < taosArrayGetSize(offsets); i++) {
    uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    if (ret != 0) {
      indexError("failed to find target tablelist");
      return TSDB_CODE_TDB_FILE_CORRUPTED;
dengyihao's avatar
add UT  
dengyihao 已提交
276
    }
dengyihao's avatar
dengyihao 已提交
277
  }
dengyihao's avatar
dengyihao 已提交
278 279 280 281 282
  if (hasJson) {
    taosMemoryFree(p);
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
283
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);

  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  /*impl later*/
  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
302
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);

  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  /*impl later*/

  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
322

dengyihao's avatar
dengyihao 已提交
323
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType type) {
324 325 326 327
  int                  ret = 0;
  char*                p = tem->colVal;
  int                  skip = 0;
  _cache_range_compare cmpFn = indexGetCompare(type);
dengyihao's avatar
dengyihao 已提交
328

dengyihao's avatar
dengyihao 已提交
329 330 331 332 333
  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

  AutomationCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
  FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);

dengyihao's avatar
dengyihao 已提交
334
  FstSlice h = fstSliceCreate((uint8_t*)p, skip);
dengyihao's avatar
dengyihao 已提交
335 336 337 338 339 340
  fstStreamBuilderSetRange(sb, &h, type);
  fstSliceDestroy(&h);

  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
341 342
    FstSlice* s = &rt->data;
    char*     ch = (char*)fstSliceData(s, NULL);
343 344 345 346 347 348
    // if (0 != strncmp(ch, tem->colName, tem->nColName)) {
    //  swsResultDestroy(rt);
    //  break;
    //}

    TExeCond cond = cmpFn(ch, p, tem->colType);
349 350 351 352 353 354 355
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
dengyihao's avatar
dengyihao 已提交
356 357 358 359 360 361
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
362
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
363
  return tfSearchCompareFunc(reader, tem, tr, LT);
dengyihao's avatar
dengyihao 已提交
364
}
dengyihao's avatar
dengyihao 已提交
365
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
366
  return tfSearchCompareFunc(reader, tem, tr, LE);
dengyihao's avatar
dengyihao 已提交
367
}
dengyihao's avatar
dengyihao 已提交
368
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
369
  return tfSearchCompareFunc(reader, tem, tr, GT);
dengyihao's avatar
dengyihao 已提交
370
}
dengyihao's avatar
dengyihao 已提交
371
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
372
  return tfSearchCompareFunc(reader, tem, tr, GE);
dengyihao's avatar
dengyihao 已提交
373
}
dengyihao's avatar
dengyihao 已提交
374
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
  bool     hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  // uint64_t offset;
  // if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
  //  int64_t et = taosGetTimestampUs();
  //  int64_t cost = et - st;
  //  indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
  //            tem->suid, tem->colName, tem->colVal, cost);
dengyihao's avatar
dengyihao 已提交
391

dengyihao's avatar
dengyihao 已提交
392 393 394 395 396 397 398 399 400 401 402
  //  ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
  //  cost = taosGetTimestampUs() - et;
  //  indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
  //            tem->colName, tem->colVal, cost);
  //}
  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
403
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
  int   ret = 0;
  char* p = indexPackJsonData(tem);
  int   sz = strlen(p);

  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);

    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    cost = taosGetTimestampUs() - et;
dengyihao's avatar
dengyihao 已提交
419 420 421
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
              ", size: %d, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
422 423 424 425 426 427
  }
  fstSliceDestroy(&key);
  return 0;
  // deprecate api
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
428 429 430
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, EQ);
}
dengyihao's avatar
dengyihao 已提交
431
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
432
  return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS);
433
}
dengyihao's avatar
dengyihao 已提交
434
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
435 436 437
  // impl later
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
438
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
439 440 441
  // impl later
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
442
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
443 444
  return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
}
dengyihao's avatar
dengyihao 已提交
445
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
446 447
  return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
}
dengyihao's avatar
dengyihao 已提交
448
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
449 450
  return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
}
dengyihao's avatar
dengyihao 已提交
451
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
452 453
  return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
}
dengyihao's avatar
dengyihao 已提交
454
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
455 456 457 458
  // impl later
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
459
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype) {
460 461 462
  int ret = 0;
  int skip = 0;

dengyihao's avatar
dengyihao 已提交
463 464 465 466 467 468 469
  char* p = NULL;
  if (ctype == CONTAINS) {
    SIndexTerm tm = {.suid = tem->suid,
                     .operType = tem->operType,
                     .colType = tem->colType,
                     .colName = tem->colVal,
                     .nColName = tem->nColVal};
dengyihao's avatar
dengyihao 已提交
470
    p = indexPackJsonDataPrefixNoType(&tm, &skip);
dengyihao's avatar
dengyihao 已提交
471 472 473
  } else {
    p = indexPackJsonDataPrefix(tem, &skip);
  }
474 475 476 477

  _cache_range_compare cmpFn = indexGetCompare(ctype);

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
478

479 480 481 482 483 484 485
  AutomationCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);

  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
    FstSlice* s = &rt->data;
486

dengyihao's avatar
dengyihao 已提交
487 488 489 490
    int32_t  sz = 0;
    char*    ch = (char*)fstSliceData(s, &sz);
    TExeCond cond = CONTINUE;
    if (ctype == CONTAINS) {
dengyihao's avatar
dengyihao 已提交
491
      if (0 == strncmp(ch, p, skip)) {
dengyihao's avatar
dengyihao 已提交
492 493 494 495 496 497 498 499
        cond = MATCH;
      }
    } else {
      if (0 != strncmp(ch, p, skip)) {
        swsResultDestroy(rt);
        break;
      }
      cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
500
    }
501 502 503 504 505 506 507 508 509 510 511 512 513
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
514
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
515 516
  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
517
  int             ret = 0;
518
  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
dengyihao's avatar
dengyihao 已提交
519
    ret = tfSearch[1][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
520
  } else {
dengyihao's avatar
dengyihao 已提交
521
    ret = tfSearch[0][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
522
  }
523

dengyihao's avatar
dengyihao 已提交
524
  tfileReaderUnRef(reader);
dengyihao's avatar
dengyihao 已提交
525
  return ret;
dengyihao's avatar
dengyihao 已提交
526 527
}

dengyihao's avatar
dengyihao 已提交
528
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
dengyihao's avatar
dengyihao 已提交
529
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
530
  tfileGenFileFullName(fullname, path, suid, colName, version);
dengyihao's avatar
dengyihao 已提交
531
  // indexInfo("open write file name %s", fullname);
dengyihao's avatar
dengyihao 已提交
532
  WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
dengyihao's avatar
dengyihao 已提交
533 534 535
  if (wcx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
536 537 538 539 540 541 542 543 544

  TFileHeader tfh = {0};
  tfh.suid = suid;
  tfh.version = version;
  memcpy(tfh.colName, colName, strlen(colName));
  tfh.colType = colType;

  return tfileWriterCreate(wcx, &tfh);
}
dengyihao's avatar
dengyihao 已提交
545
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) {
dengyihao's avatar
dengyihao 已提交
546
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
547 548
  tfileGenFileFullName(fullname, path, suid, colName, version);

dengyihao's avatar
dengyihao 已提交
549
  WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
dengyihao's avatar
dengyihao 已提交
550
  if (wc == NULL) {
551 552
    terrno = TAOS_SYSTEM_ERROR(errno);
    indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
dengyihao's avatar
dengyihao 已提交
553 554
    return NULL;
  }
555
  indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
dengyihao's avatar
dengyihao 已提交
556

dengyihao's avatar
dengyihao 已提交
557
  TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
558 559
  return reader;
}
dengyihao's avatar
dengyihao 已提交
560
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
wafwerar's avatar
wafwerar 已提交
561
  TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
dengyihao's avatar
dengyihao 已提交
562
  if (tw == NULL) {
dengyihao's avatar
dengyihao 已提交
563
    indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
dengyihao's avatar
dengyihao 已提交
564 565
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
566 567
  tw->ctx = ctx;
  tw->header = *header;
dengyihao's avatar
dengyihao 已提交
568
  tfileWriteHeader(tw);
dengyihao's avatar
dengyihao 已提交
569 570
  return tw;
}
dengyihao's avatar
dengyihao 已提交
571

dengyihao's avatar
dengyihao 已提交
572
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
dengyihao's avatar
dengyihao 已提交
573
  // sort by coltype and write to tindex
dengyihao's avatar
dengyihao 已提交
574 575
  if (order == false) {
    __compar_fn_t fn;
dengyihao's avatar
dengyihao 已提交
576 577

    int8_t colType = tw->header.colType;
dengyihao's avatar
add UT  
dengyihao 已提交
578
    colType = INDEX_TYPE_GET_TYPE(colType);
dengyihao's avatar
dengyihao 已提交
579 580 581 582 583 584
    if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
      fn = tfileStrCompare;
    } else {
      fn = getComparFunc(colType, 0);
    }
    taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
dengyihao's avatar
dengyihao 已提交
585
  }
dengyihao's avatar
dengyihao 已提交
586

dengyihao's avatar
dengyihao 已提交
587
  int32_t bufLimit = 64 * 4096, offset = 0;
wafwerar's avatar
wafwerar 已提交
588
  // char*   buf = taosMemoryCalloc(1, sizeof(char) * bufLimit);
dengyihao's avatar
dengyihao 已提交
589
  // char*   p = buf;
dengyihao's avatar
dengyihao 已提交
590
  int32_t sz = taosArrayGetSize((SArray*)data);
dengyihao's avatar
dengyihao 已提交
591 592 593 594 595
  int32_t fstOffset = tw->offset;

  // ugly code, refactor later
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
596 597
    taosArraySort(v->tableId, tfileUidCompare);
    taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
598
    int32_t tbsz = taosArrayGetSize(v->tableId);
dengyihao's avatar
dengyihao 已提交
599
    fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
dengyihao's avatar
dengyihao 已提交
600 601 602
  }
  tfileWriteFstOffset(tw, fstOffset);

dengyihao's avatar
dengyihao 已提交
603 604 605 606 607 608 609
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
    // check buf has enough space or not
    int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);

wafwerar's avatar
wafwerar 已提交
610
    char* buf = taosMemoryCalloc(1, ttsz * sizeof(char));
dengyihao's avatar
dengyihao 已提交
611
    char* p = buf;
dengyihao's avatar
dengyihao 已提交
612
    tfileSerialTableIdsToBuf(p, v->tableId);
dengyihao's avatar
dengyihao 已提交
613
    tw->ctx->write(tw->ctx, buf, ttsz);
dengyihao's avatar
dengyihao 已提交
614 615
    v->offset = tw->offset;
    tw->offset += ttsz;
wafwerar's avatar
wafwerar 已提交
616
    taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
617
  }
dengyihao's avatar
dengyihao 已提交
618

dengyihao's avatar
dengyihao 已提交
619 620
  tw->fb = fstBuilderCreate(tw->ctx, 0);
  if (tw->fb == NULL) {
dengyihao's avatar
dengyihao 已提交
621
    tfileWriterClose(tw);
dengyihao's avatar
dengyihao 已提交
622 623
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
624 625

  // write data
dengyihao's avatar
dengyihao 已提交
626 627 628
  for (size_t i = 0; i < sz; i++) {
    // TODO, fst batch write later
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
629 630 631 632
    if (tfileWriteData(tw, v) != 0) {
      indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
                 (int)taosArrayGetSize(v->tableId));
    } else {
dengyihao's avatar
dengyihao 已提交
633 634
      // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
      //          (int)taosArrayGetSize(v->tableId));
dengyihao's avatar
dengyihao 已提交
635 636

      // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
dengyihao's avatar
dengyihao 已提交
637 638
    }
  }
639

dengyihao's avatar
dengyihao 已提交
640 641 642
  fstBuilderFinish(tw->fb);
  fstBuilderDestroy(tw->fb);
  tw->fb = NULL;
dengyihao's avatar
dengyihao 已提交
643 644

  tfileWriteFooter(tw);
dengyihao's avatar
dengyihao 已提交
645 646
  return 0;
}
dengyihao's avatar
dengyihao 已提交
647
void tfileWriterClose(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
648 649 650
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
651
  writerCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
652
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
653
}
dengyihao's avatar
dengyihao 已提交
654
void tfileWriterDestroy(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
655 656 657
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
658
  writerCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
659
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
660
}
dengyihao's avatar
dengyihao 已提交
661

dengyihao's avatar
dengyihao 已提交
662
IndexTFile* indexTFileCreate(const char* path) {
dengyihao's avatar
dengyihao 已提交
663
  TFileCache* cache = tfileCacheCreate(path);
dengyihao's avatar
dengyihao 已提交
664 665 666
  if (cache == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
667

wafwerar's avatar
wafwerar 已提交
668
  IndexTFile* tfile = taosMemoryCalloc(1, sizeof(IndexTFile));
dengyihao's avatar
dengyihao 已提交
669 670 671 672
  if (tfile == NULL) {
    tfileCacheDestroy(cache);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
673
  taosThreadMutexInit(&tfile->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
674
  tfile->cache = cache;
dengyihao's avatar
dengyihao 已提交
675 676
  return tfile;
}
677
void indexTFileDestroy(IndexTFile* tfile) {
dengyihao's avatar
dengyihao 已提交
678 679 680
  if (tfile == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
681
  taosThreadMutexDestroy(&tfile->mtx);
dengyihao's avatar
dengyihao 已提交
682
  tfileCacheDestroy(tfile->cache);
wafwerar's avatar
wafwerar 已提交
683
  taosMemoryFree(tfile);
dengyihao's avatar
dengyihao 已提交
684
}
dengyihao's avatar
dengyihao 已提交
685

dengyihao's avatar
dengyihao 已提交
686
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
dengyihao's avatar
dengyihao 已提交
687
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
688 689 690
  if (tfile == NULL) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
691

dengyihao's avatar
add UT  
dengyihao 已提交
692
  int64_t     st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
693
  IndexTFile* pTfile = tfile;
694

dengyihao's avatar
dengyihao 已提交
695 696
  SIndexTerm* term = query->term;
  ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
dengyihao's avatar
dengyihao 已提交
697 698

  taosThreadMutexLock(&pTfile->mtx);
699
  TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
dengyihao's avatar
dengyihao 已提交
700
  taosThreadMutexUnlock(&pTfile->mtx);
dengyihao's avatar
dengyihao 已提交
701 702 703
  if (reader == NULL) {
    return 0;
  }
dengyihao's avatar
add UT  
dengyihao 已提交
704 705
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("index tfile stage 1 cost: %" PRId64 "", cost);
dengyihao's avatar
dengyihao 已提交
706

dengyihao's avatar
dengyihao 已提交
707
  return tfileReaderSearch(reader, query, result);
dengyihao's avatar
dengyihao 已提交
708
}
dengyihao's avatar
dengyihao 已提交
709
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
710 711
  // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
  // term->nColName, .version = 1};
dengyihao's avatar
dengyihao 已提交
712

713 714
  return 0;
}
dengyihao's avatar
dengyihao 已提交
715 716 717 718 719 720 721 722 723
static bool tfileIteratorNext(Iterate* iiter) {
  IterateValue* iv = &iiter->val;
  iterateValueDestroy(iv, false);

  char*    colVal = NULL;
  uint64_t offset = 0;

  TFileFstIter*          tIter = iiter->iter;
  StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
dengyihao's avatar
dengyihao 已提交
724 725 726
  if (rt == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
727 728 729

  int32_t sz = 0;
  char*   ch = (char*)fstSliceData(&rt->data, &sz);
wafwerar's avatar
wafwerar 已提交
730
  colVal = taosMemoryCalloc(1, sz + 1);
dengyihao's avatar
dengyihao 已提交
731 732 733 734 735
  memcpy(colVal, ch, sz);

  offset = (uint64_t)(rt->out.out);
  swsResultDestroy(rt);
  // set up iterate value
dengyihao's avatar
dengyihao 已提交
736 737 738
  if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
739

dengyihao's avatar
dengyihao 已提交
740
  iv->ver = 0;
741
  iv->type = ADD_VALUE;  // value in tfile always ADD_VALUE
dengyihao's avatar
dengyihao 已提交
742
  iv->colVal = colVal;
dengyihao's avatar
dengyihao 已提交
743
  return true;
dengyihao's avatar
dengyihao 已提交
744 745 746
  // std::string key(ch, sz);
}

747
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
dengyihao's avatar
dengyihao 已提交
748 749

static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
750 751
  TFileFstIter* iter = taosMemoryCalloc(1, sizeof(TFileFstIter));
  if (iter == NULL) {
dengyihao's avatar
dengyihao 已提交
752 753
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
754

dengyihao's avatar
dengyihao 已提交
755 756 757 758 759
  iter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
  iter->fb = fstSearch(reader->fst, iter->ctx);
  iter->st = streamBuilderIntoStream(iter->fb);
  iter->rdr = reader;
  return iter;
dengyihao's avatar
dengyihao 已提交
760 761 762
}

Iterate* tfileIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
763 764 765
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
766

wafwerar's avatar
wafwerar 已提交
767
  Iterate* iter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
768
  iter->iter = tfileFstIteratorCreate(reader);
769
  if (iter->iter == NULL) {
wafwerar's avatar
wafwerar 已提交
770
    taosMemoryFree(iter);
771 772
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
773 774
  iter->next = tfileIteratorNext;
  iter->getValue = tifileIterateGetValue;
dengyihao's avatar
dengyihao 已提交
775
  iter->val.val = taosArrayInit(1, sizeof(uint64_t));
776
  iter->val.colVal = NULL;
dengyihao's avatar
dengyihao 已提交
777 778 779
  return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
780 781 782
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
783

dengyihao's avatar
dengyihao 已提交
784 785 786 787 788 789 790
  IterateValue* iv = &iter->val;
  iterateValueDestroy(iv, true);

  TFileFstIter* tIter = iter->iter;
  streamWithStateDestroy(tIter->st);
  fstStreamBuilderDestroy(tIter->fb);
  automCtxDestroy(tIter->ctx);
wafwerar's avatar
wafwerar 已提交
791
  taosMemoryFree(tIter);
dengyihao's avatar
dengyihao 已提交
792

wafwerar's avatar
wafwerar 已提交
793
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
794 795
}

dengyihao's avatar
dengyihao 已提交
796
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
dengyihao's avatar
dengyihao 已提交
797 798 799
  if (tf == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
800 801 802 803 804 805 806
  TFileReader* rd = NULL;
  ICacheKey    key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};

  taosThreadMutexLock(&tf->mtx);
  rd = tfileCacheGet(tf->cache, &key);
  taosThreadMutexUnlock(&tf->mtx);
  return rd;
dengyihao's avatar
dengyihao 已提交
807
}
dengyihao's avatar
dengyihao 已提交
808

dengyihao's avatar
dengyihao 已提交
809 810 811 812 813
static int tfileUidCompare(const void* a, const void* b) {
  uint64_t l = *(uint64_t*)a;
  uint64_t r = *(uint64_t*)b;
  return l - r;
}
dengyihao's avatar
dengyihao 已提交
814 815
static int tfileStrCompare(const void* a, const void* b) {
  int ret = strcmp((char*)a, (char*)b);
dengyihao's avatar
dengyihao 已提交
816 817 818
  if (ret == 0) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
819 820 821
  return ret < 0 ? -1 : 1;
}

dengyihao's avatar
dengyihao 已提交
822 823 824 825 826 827 828 829
static int tfileValueCompare(const void* a, const void* b, const void* param) {
  __compar_fn_t fn = *(__compar_fn_t*)param;

  TFileValue* av = (TFileValue*)a;
  TFileValue* bv = (TFileValue*)b;

  return fn(av->colVal, bv->colVal);
}
dengyihao's avatar
dengyihao 已提交
830 831

TFileValue* tfileValueCreate(char* val) {
wafwerar's avatar
wafwerar 已提交
832
  TFileValue* tf = taosMemoryCalloc(1, sizeof(TFileValue));
dengyihao's avatar
dengyihao 已提交
833 834 835
  if (tf == NULL) {
    return NULL;
  }
836
  tf->colVal = tstrdup(val);
dengyihao's avatar
dengyihao 已提交
837 838 839 840
  tf->tableId = taosArrayInit(32, sizeof(uint64_t));
  return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
dengyihao's avatar
dengyihao 已提交
841 842 843
  if (tf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
844 845 846 847 848
  taosArrayPush(tf->tableId, &val);
  return 0;
}
void tfileValueDestroy(TFileValue* tf) {
  taosArrayDestroy(tf->tableId);
wafwerar's avatar
wafwerar 已提交
849 850
  taosMemoryFree(tf->colVal);
  taosMemoryFree(tf);
dengyihao's avatar
dengyihao 已提交
851
}
dengyihao's avatar
dengyihao 已提交
852 853 854 855 856
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
  int sz = taosArrayGetSize(ids);
  SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
  for (size_t i = 0; i < sz; i++) {
    uint64_t* v = taosArrayGet(ids, i);
dengyihao's avatar
dengyihao 已提交
857 858 859 860 861 862 863
    SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
  }
}

static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
  int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
  tw->header.fstOffset = fstOffset;
dengyihao's avatar
dengyihao 已提交
864

dengyihao's avatar
dengyihao 已提交
865 866 867
  if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
868
  indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
dengyihao's avatar
dengyihao 已提交
869
  tw->offset += sizeof(fstOffset);
dengyihao's avatar
dengyihao 已提交
870 871 872
  return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
dengyihao's avatar
dengyihao 已提交
873
  char buf[TFILE_HEADER_NO_FST] = {0};
dengyihao's avatar
dengyihao 已提交
874 875 876 877

  TFileHeader* header = &writer->header;
  memcpy(buf, (char*)header, sizeof(buf));

dengyihao's avatar
dengyihao 已提交
878
  indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
879
  int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
880 881 882
  if (sizeof(buf) != nwrite) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
883 884

  indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
885 886 887 888 889 890
  writer->offset = nwrite;
  return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
  TFileHeader* header = &write->header;
  uint8_t      colType = header->colType;
dengyihao's avatar
add UT  
dengyihao 已提交
891 892

  colType = INDEX_TYPE_GET_TYPE(colType);
893 894
  FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
  if (fstBuilderInsert(write->fb, key, tval->offset)) {
dengyihao's avatar
dengyihao 已提交
895
    fstSliceDestroy(&key);
896
    return 0;
dengyihao's avatar
dengyihao 已提交
897
  }
898 899 900 901 902 903 904 905 906 907 908 909 910
  return -1;

  // if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
  //  FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
  //  if (fstBuilderInsert(write->fb, key, tval->offset)) {
  //    fstSliceDestroy(&key);
  //    return 0;
  //  }
  //  fstSliceDestroy(&key);
  //  return -1;
  //} else {
  //  // handle other type later
  //}
dengyihao's avatar
dengyihao 已提交
911
}
dengyihao's avatar
dengyihao 已提交
912 913 914 915
static int tfileWriteFooter(TFileWriter* write) {
  char  buf[sizeof(tfileMagicNumber) + 1] = {0};
  void* pBuf = (void*)buf;
  taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
dengyihao's avatar
dengyihao 已提交
916
  int nwrite = write->ctx->write(write->ctx, buf, (int32_t)strlen(buf));
dengyihao's avatar
dengyihao 已提交
917 918

  indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
dengyihao's avatar
dengyihao 已提交
919 920 921
  assert(nwrite == sizeof(tfileMagicNumber));
  return nwrite;
}
dengyihao's avatar
dengyihao 已提交
922
static int tfileReaderLoadHeader(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
923
  // TODO simple tfile header later
dengyihao's avatar
dengyihao 已提交
924
  char buf[TFILE_HEADER_SIZE] = {0};
dengyihao's avatar
dengyihao 已提交
925

dengyihao's avatar
dengyihao 已提交
926
  int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
dengyihao's avatar
dengyihao 已提交
927
  if (nread == -1) {
dengyihao's avatar
dengyihao 已提交
928 929
    indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
               reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
930
  } else {
dengyihao's avatar
dengyihao 已提交
931
    indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
932 933
  }
  // assert(nread == sizeof(buf));
dengyihao's avatar
dengyihao 已提交
934
  memcpy(&reader->header, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
935

dengyihao's avatar
dengyihao 已提交
936 937
  return 0;
}
dengyihao's avatar
dengyihao 已提交
938
static int tfileReaderLoadFst(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
939 940
  WriterCtx* ctx = reader->ctx;
  int        size = ctx->size(ctx);
dengyihao's avatar
dengyihao 已提交
941

dengyihao's avatar
dengyihao 已提交
942 943
  // current load fst into memory, refactor it later
  int   fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber);
wafwerar's avatar
wafwerar 已提交
944
  char* buf = taosMemoryCalloc(1, fstSize);
dengyihao's avatar
dengyihao 已提交
945 946 947
  if (buf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
948

dengyihao's avatar
dengyihao 已提交
949
  int64_t ts = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
950
  int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
dengyihao's avatar
dengyihao 已提交
951
  int64_t cost = taosGetTimestampUs() - ts;
dengyihao's avatar
dengyihao 已提交
952
  indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
dengyihao's avatar
dengyihao 已提交
953
            reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
dengyihao's avatar
dengyihao 已提交
954
  // we assuse fst size less than FST_MAX_SIZE
dengyihao's avatar
dengyihao 已提交
955
  assert(nread > 0 && nread <= fstSize);
dengyihao's avatar
dengyihao 已提交
956 957 958

  FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
  reader->fst = fstCreate(&st);
wafwerar's avatar
wafwerar 已提交
959
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
960 961
  fstSliceDestroy(&st);

dengyihao's avatar
dengyihao 已提交
962
  return reader->fst != NULL ? 0 : -1;
dengyihao's avatar
dengyihao 已提交
963
}
dengyihao's avatar
dengyihao 已提交
964
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
dengyihao's avatar
dengyihao 已提交
965
  // TODO(yihao): opt later
dengyihao's avatar
dengyihao 已提交
966
  WriterCtx* ctx = reader->ctx;
967
  // add block cache
dengyihao's avatar
dengyihao 已提交
968
  char    block[4096] = {0};
969
  int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
dengyihao's avatar
dengyihao 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982 983
  assert(nread >= sizeof(uint32_t));

  char*   p = block;
  int32_t nid = *(int32_t*)p;
  p += sizeof(nid);

  while (nid > 0) {
    int32_t left = block + sizeof(block) - p;
    if (left >= sizeof(uint64_t)) {
      taosArrayPush(result, (uint64_t*)p);
      p += sizeof(uint64_t);
    } else {
      char buf[sizeof(uint64_t)] = {0};
      memcpy(buf, p, left);
dengyihao's avatar
dengyihao 已提交
984

dengyihao's avatar
dengyihao 已提交
985 986 987 988
      memset(block, 0, sizeof(block));
      offset += sizeof(block);
      nread = ctx->readFrom(ctx, block, sizeof(block), offset);
      memcpy(buf + left, block, sizeof(uint64_t) - left);
dengyihao's avatar
dengyihao 已提交
989

dengyihao's avatar
dengyihao 已提交
990 991 992 993
      taosArrayPush(result, (uint64_t*)buf);
      p = block + sizeof(uint64_t) - left;
    }
    nid -= 1;
dengyihao's avatar
dengyihao 已提交
994
  }
dengyihao's avatar
dengyihao 已提交
995 996
  return 0;
}
dengyihao's avatar
dengyihao 已提交
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
static int tfileReaderVerify(TFileReader* reader) {
  // just validate header and Footer, file corrupted also shuild be verified later
  WriterCtx* ctx = reader->ctx;

  uint64_t tMagicNumber = 0;

  char buf[sizeof(tMagicNumber) + 1] = {0};
  int  size = ctx->size(ctx);

  if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
    return -1;
  } else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
    return -1;
  }

  taosDecodeFixedU64(buf, &tMagicNumber);
  return tMagicNumber == tfileMagicNumber ? 0 : -1;
}

dengyihao's avatar
dengyihao 已提交
1016
void tfileReaderRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
1017 1018 1019
  if (reader == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1020 1021 1022 1023
  int ref = T_REF_INC(reader);
  UNUSED(ref);
}

dengyihao's avatar
dengyihao 已提交
1024
void tfileReaderUnRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
1025 1026 1027
  if (reader == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1028
  int ref = T_REF_DEC(reader);
1029
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
1030
    // do nothing
1031 1032
    tfileReaderDestroy(reader);
  }
dengyihao's avatar
dengyihao 已提交
1033
}
dengyihao's avatar
dengyihao 已提交
1034

dengyihao's avatar
dengyihao 已提交
1035
static SArray* tfileGetFileList(const char* path) {
dengyihao's avatar
dengyihao 已提交
1036 1037
  char     buf[128] = {0};
  uint64_t suid;
dengyihao's avatar
dengyihao 已提交
1038
  int64_t  version;
dengyihao's avatar
dengyihao 已提交
1039
  SArray*  files = taosArrayInit(4, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1040

wafwerar's avatar
wafwerar 已提交
1041 1042
  TdDirPtr pDir = taosOpenDir(path);
  if (NULL == pDir) {
dengyihao's avatar
dengyihao 已提交
1043 1044
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
1045 1046 1047
  TdDirEntryPtr pDirEntry;
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char* file = taosGetDirEntryName(pDirEntry);
dengyihao's avatar
dengyihao 已提交
1048 1049 1050
    if (0 != tfileParseFileName(file, &suid, buf, &version)) {
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1051 1052

    size_t len = strlen(path) + 1 + strlen(file) + 1;
wafwerar's avatar
wafwerar 已提交
1053
    char*  buf = taosMemoryCalloc(1, len);
dengyihao's avatar
dengyihao 已提交
1054
    sprintf(buf, "%s/%s", path, file);
dengyihao's avatar
dengyihao 已提交
1055
    taosArrayPush(files, &buf);
dengyihao's avatar
dengyihao 已提交
1056
  }
wafwerar's avatar
wafwerar 已提交
1057
  taosCloseDir(&pDir);
dengyihao's avatar
dengyihao 已提交
1058 1059 1060 1061 1062

  taosArraySort(files, tfileCompare);
  tfileRmExpireFile(files);

  return files;
dengyihao's avatar
dengyihao 已提交
1063
}
dengyihao's avatar
dengyihao 已提交
1064 1065 1066 1067
static int tfileRmExpireFile(SArray* result) {
  // TODO(yihao): remove expire tindex after restart
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1068 1069
static void tfileDestroyFileName(void* elem) {
  char* p = *(char**)elem;
wafwerar's avatar
wafwerar 已提交
1070
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
1071 1072
}
static int tfileCompare(const void* a, const void* b) {
dengyihao's avatar
dengyihao 已提交
1073 1074 1075
  const char* as = *(char**)a;
  const char* bs = *(char**)b;
  return strcmp(as, bs);
dengyihao's avatar
dengyihao 已提交
1076
}
dengyihao's avatar
dengyihao 已提交
1077

dengyihao's avatar
dengyihao 已提交
1078 1079
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) {
dengyihao's avatar
dengyihao 已提交
1080 1081 1082 1083 1084
    // read suid & colid & version  success
    return 0;
  }
  return -1;
}
dengyihao's avatar
dengyihao 已提交
1085
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
1086 1087
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) {
  sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
dengyihao's avatar
dengyihao 已提交
1088 1089
  return;
}
dengyihao's avatar
dengyihao 已提交
1090
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
dengyihao's avatar
dengyihao 已提交
1091 1092 1093 1094
  char filename[128] = {0};
  tfileGenFileName(filename, suid, col, version);
  sprintf(fullname, "%s/%s", path, filename);
}