indexTfile.c 34.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
15
#include "indexTfile.h"
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18 19 20
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
21
#include "taosdef.h"
22
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tcoding.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26 27
const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull;

dengyihao's avatar
dengyihao 已提交
28 29 30 31 32 33 34
typedef struct TFileFstIter {
  FstStreamBuilder* fb;
  StreamWithState*  st;
  AutomationCtx*    ctx;
  TFileReader*      rdr;
} TFileFstIter;

dengyihao's avatar
dengyihao 已提交
35 36
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))

dengyihao's avatar
dengyihao 已提交
37
static int  tfileUidCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
38
static int  tfileStrCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
39 40
static int  tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
static int tfileWriteHeader(TFileWriter* writer);
dengyihao's avatar
dengyihao 已提交
43
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
dengyihao's avatar
dengyihao 已提交
44
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
dengyihao's avatar
dengyihao 已提交
45
static int tfileWriteFooter(TFileWriter* write);
dengyihao's avatar
dengyihao 已提交
46

dengyihao's avatar
dengyihao 已提交
47
// handle file corrupt later
dengyihao's avatar
dengyihao 已提交
48 49
static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
50
static int tfileReaderVerify(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
51
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
dengyihao's avatar
dengyihao 已提交
52

dengyihao's avatar
dengyihao 已提交
53 54 55 56
static SArray* tfileGetFileList(const char* path);
static int     tfileRmExpireFile(SArray* result);
static void    tfileDestroyFileName(void* elem);
static int     tfileCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
57 58 59
static int     tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version);
static void    tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version);
static void    tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version);
dengyihao's avatar
dengyihao 已提交
60 61 62 63 64 65 66
/*
 * search from  tfile
 */
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
dengyihao's avatar
dengyihao 已提交
67 68 69 70
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
dengyihao's avatar
dengyihao 已提交
71 72
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);

dengyihao's avatar
dengyihao 已提交
73 74
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);

static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);

static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
    {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
     tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
    {tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
     tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93
TFileCache* tfileCacheCreate(const char* path) {
wafwerar's avatar
wafwerar 已提交
94
  TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
dengyihao's avatar
dengyihao 已提交
95 96 97
  if (tcache == NULL) {
    return NULL;
  }
98 99 100 101

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

dengyihao's avatar
dengyihao 已提交
102
  SArray* files = tfileGetFileList(path);
dengyihao's avatar
dengyihao 已提交
103
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
dengyihao's avatar
dengyihao 已提交
104
    char* file = taosArrayGetP(files, i);
dengyihao's avatar
dengyihao 已提交
105

dengyihao's avatar
dengyihao 已提交
106
    WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
107
    if (wc == NULL) {
dengyihao's avatar
dengyihao 已提交
108
      indexError("failed to open index:%s", file);
109
      goto End;
dengyihao's avatar
dengyihao 已提交
110
    }
dengyihao's avatar
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
    TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
113 114 115 116
    if (reader == NULL) {
      indexInfo("skip invalid file: %s", file);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
117
    TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
118
    ICacheKey    key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
119

dengyihao's avatar
dengyihao 已提交
120
    char    buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
121 122 123
    int32_t sz = indexSerialCacheKey(&key, buf);
    assert(sz < sizeof(buf));
    taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
124
    tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
127
  return tcache;
dengyihao's avatar
dengyihao 已提交
128
End:
129
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
130
  taosArrayDestroyEx(files, tfileDestroyFileName);
131
  return NULL;
dengyihao's avatar
dengyihao 已提交
132
}
dengyihao's avatar
dengyihao 已提交
133
void tfileCacheDestroy(TFileCache* tcache) {
dengyihao's avatar
dengyihao 已提交
134 135 136
  if (tcache == NULL) {
    return;
  }
137
  // free table cache
dengyihao's avatar
dengyihao 已提交
138
  TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
139
  while (reader) {
dengyihao's avatar
dengyihao 已提交
140
    TFileReader* p = *reader;
141 142
    indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
              p->header.colType);
dengyihao's avatar
dengyihao 已提交
143
    tfileReaderUnRef(p);
144 145 146
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
wafwerar's avatar
wafwerar 已提交
147
  taosMemoryFree(tcache);
dengyihao's avatar
dengyihao 已提交
148 149
}

dengyihao's avatar
dengyihao 已提交
150 151 152 153 154
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
  char    buf[128] = {0};
  int32_t sz = indexSerialCacheKey(key, buf);
  assert(sz < sizeof(buf));
  TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
155
  if (reader == NULL || *reader == NULL) {
dengyihao's avatar
dengyihao 已提交
156 157
    return NULL;
  }
158
  tfileReaderRef(*reader);
dengyihao's avatar
dengyihao 已提交
159

160
  return *reader;
dengyihao's avatar
dengyihao 已提交
161
}
dengyihao's avatar
dengyihao 已提交
162 163 164
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
  char    buf[128] = {0};
  int32_t sz = indexSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
165
  // remove last version index reader
dengyihao's avatar
dengyihao 已提交
166
  TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
167
  if (p != NULL && *p != NULL) {
dengyihao's avatar
dengyihao 已提交
168
    TFileReader* oldRdr = *p;
dengyihao's avatar
dengyihao 已提交
169
    taosHashRemove(tcache->tableCache, buf, sz);
dengyihao's avatar
dengyihao 已提交
170 171 172
    indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
    oldRdr->remove = true;
    tfileReaderUnRef(oldRdr);
dengyihao's avatar
dengyihao 已提交
173
  }
dengyihao's avatar
dengyihao 已提交
174
  taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
175
  tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
176
  return;
177
}
dengyihao's avatar
dengyihao 已提交
178
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
wafwerar's avatar
wafwerar 已提交
179
  TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
dengyihao's avatar
dengyihao 已提交
180 181 182
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
183
  reader->ctx = ctx;
dengyihao's avatar
dengyihao 已提交
184 185

  if (0 != tfileReaderVerify(reader)) {
dengyihao's avatar
dengyihao 已提交
186
    indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
dengyihao's avatar
dengyihao 已提交
187
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
188 189 190
    return NULL;
  }
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
191
  if (0 != tfileReaderLoadHeader(reader)) {
192 193
    indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
               reader->header.colName);
dengyihao's avatar
dengyihao 已提交
194
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
195 196 197 198
    return NULL;
  }

  if (0 != tfileReaderLoadFst(reader)) {
dengyihao's avatar
dengyihao 已提交
199 200
    indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid,
               reader->header.colName, errno);
dengyihao's avatar
dengyihao 已提交
201 202 203
    tfileReaderDestroy(reader);
    return NULL;
  }
204
  reader->remove = false;
dengyihao's avatar
dengyihao 已提交
205

206
  return reader;
dengyihao's avatar
dengyihao 已提交
207
}
dengyihao's avatar
dengyihao 已提交
208
void tfileReaderDestroy(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
209 210 211
  if (reader == NULL) {
    return;
  }
212
  // T_REF_INC(reader);
dengyihao's avatar
dengyihao 已提交
213
  fstDestroy(reader->fst);
dengyihao's avatar
dengyihao 已提交
214 215 216 217 218
  if (reader->remove) {
    indexInfo("%s is removed", reader->ctx->file.buf);
  } else {
    indexInfo("%s is not removed", reader->ctx->file.buf);
  }
dengyihao's avatar
dengyihao 已提交
219
  writerCtxDestroy(reader->ctx, reader->remove);
dengyihao's avatar
dengyihao 已提交
220

wafwerar's avatar
wafwerar 已提交
221
  taosMemoryFree(reader);
dengyihao's avatar
dengyihao 已提交
222
}
dengyihao's avatar
dengyihao 已提交
223 224 225 226
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
227

dengyihao's avatar
dengyihao 已提交
228 229 230 231 232 233 234 235
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);
dengyihao's avatar
dengyihao 已提交
236

dengyihao's avatar
dengyihao 已提交
237
    ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
dengyihao's avatar
dengyihao 已提交
238 239 240 241 242 243 244
    cost = taosGetTimestampUs() - et;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
              tem->colName, tem->colVal, cost);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
245

dengyihao's avatar
dengyihao 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  bool     hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

  AutomationCtx*         ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FstStreamBuilder*      sb = fstSearch(((TFileReader*)reader)->fst, ctx);
  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
    taosArrayPush(offsets, &(rt->out.out));
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);

  int32_t ret = 0;
  for (int i = 0; i < taosArrayGetSize(offsets); i++) {
    uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    if (ret != 0) {
      indexError("failed to find target tablelist");
      return TSDB_CODE_TDB_FILE_CORRUPTED;
dengyihao's avatar
add UT  
dengyihao 已提交
275
    }
dengyihao's avatar
dengyihao 已提交
276
  }
dengyihao's avatar
dengyihao 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  if (hasJson) {
    taosMemoryFree(p);
  }
  return 0;
}
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);

  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  /*impl later*/
  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);

  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  /*impl later*/

  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
321 322

static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
323 324 325 326
  int                  ret = 0;
  char*                p = tem->colVal;
  int                  skip = 0;
  _cache_range_compare cmpFn = indexGetCompare(type);
dengyihao's avatar
dengyihao 已提交
327

dengyihao's avatar
dengyihao 已提交
328 329 330 331 332
  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

  AutomationCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
  FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);

dengyihao's avatar
dengyihao 已提交
333
  FstSlice h = fstSliceCreate((uint8_t*)p, skip);
dengyihao's avatar
dengyihao 已提交
334 335 336 337 338 339
  fstStreamBuilderSetRange(sb, &h, type);
  fstSliceDestroy(&h);

  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
340 341
    FstSlice* s = &rt->data;
    char*     ch = (char*)fstSliceData(s, NULL);
342 343 344 345 346 347
    // if (0 != strncmp(ch, tem->colName, tem->nColName)) {
    //  swsResultDestroy(rt);
    //  break;
    //}

    TExeCond cond = cmpFn(ch, p, tem->colType);
348 349 350 351 352 353 354
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
dengyihao's avatar
dengyihao 已提交
355 356 357 358 359 360
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
361
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
dengyihao's avatar
dengyihao 已提交
362
  return tfSearchCompareFunc(reader, tem, tr, LT);
dengyihao's avatar
dengyihao 已提交
363 364
}
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
dengyihao's avatar
dengyihao 已提交
365
  return tfSearchCompareFunc(reader, tem, tr, LE);
dengyihao's avatar
dengyihao 已提交
366 367
}
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
dengyihao's avatar
dengyihao 已提交
368
  return tfSearchCompareFunc(reader, tem, tr, GT);
dengyihao's avatar
dengyihao 已提交
369 370
}
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
dengyihao's avatar
dengyihao 已提交
371
  return tfSearchCompareFunc(reader, tem, tr, GE);
dengyihao's avatar
dengyihao 已提交
372
}
dengyihao's avatar
dengyihao 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  bool     hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
    p = indexPackJsonData(tem);
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  // uint64_t offset;
  // if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
  //  int64_t et = taosGetTimestampUs();
  //  int64_t cost = et - st;
  //  indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
  //            tem->suid, tem->colName, tem->colVal, cost);
dengyihao's avatar
dengyihao 已提交
390

dengyihao's avatar
dengyihao 已提交
391 392 393 394 395 396 397 398 399 400 401
  //  ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
  //  cost = taosGetTimestampUs() - et;
  //  indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
  //            tem->colName, tem->colVal, cost);
  //}
  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  int   ret = 0;
  char* p = indexPackJsonData(tem);
  int   sz = strlen(p);

  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);

    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    cost = taosGetTimestampUs() - et;
dengyihao's avatar
dengyihao 已提交
418 419 420
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
              ", size: %d, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
  }
  fstSliceDestroy(&key);
  return 0;
  // deprecate api
  return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  // impl later
  return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  // impl later
  return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  // impl later
  return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
}
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
}
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
}
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
}
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
  // impl later
  return TSDB_CODE_SUCCESS;
}

static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) {
  int ret = 0;
  int skip = 0;

  char* p = indexPackJsonDataPrefix(tem, &skip);

  _cache_range_compare cmpFn = indexGetCompare(ctype);

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
465

466 467 468
  AutomationCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);

469 470 471
  // FstSlice h = fstSliceCreate((uint8_t*)p, skip);
  // fstStreamBuilderSetRange(sb, &h, ctype);
  // fstSliceDestroy(&h);
472 473 474 475 476

  StreamWithState*       st = streamBuilderIntoStream(sb);
  StreamWithStateResult* rt = NULL;
  while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
    FstSlice* s = &rt->data;
477

dengyihao's avatar
dengyihao 已提交
478 479
    int32_t sz = 0;
    char*   ch = (char*)fstSliceData(s, &sz);
dengyihao's avatar
dengyihao 已提交
480 481
    char*   tmp = taosMemoryCalloc(1, sz + 1);
    memcpy(tmp, ch, sz);
dengyihao's avatar
dengyihao 已提交
482

dengyihao's avatar
dengyihao 已提交
483
    if (0 != strncmp(tmp, p, skip)) {
484
      swsResultDestroy(rt);
dengyihao's avatar
dengyihao 已提交
485
      taosMemoryFree(tmp);
486 487 488
      break;
    }

dengyihao's avatar
dengyihao 已提交
489
    TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
490 491 492 493 494 495 496
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
dengyihao's avatar
dengyihao 已提交
497
    taosMemoryFree(tmp);
498 499 500 501 502 503
    swsResultDestroy(rt);
  }
  streamWithStateDestroy(st);
  fstStreamBuilderDestroy(sb);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
504 505 506
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
507
  int             ret = 0;
508
  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
dengyihao's avatar
dengyihao 已提交
509
    ret = tfSearch[1][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
510
  } else {
dengyihao's avatar
dengyihao 已提交
511
    ret = tfSearch[0][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
512
  }
513

dengyihao's avatar
dengyihao 已提交
514
  tfileReaderUnRef(reader);
dengyihao's avatar
dengyihao 已提交
515
  return ret;
dengyihao's avatar
dengyihao 已提交
516 517
}

dengyihao's avatar
dengyihao 已提交
518
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
dengyihao's avatar
dengyihao 已提交
519
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
520
  tfileGenFileFullName(fullname, path, suid, colName, version);
dengyihao's avatar
dengyihao 已提交
521
  // indexInfo("open write file name %s", fullname);
dengyihao's avatar
dengyihao 已提交
522
  WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
dengyihao's avatar
dengyihao 已提交
523 524 525
  if (wcx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
526 527 528 529 530 531 532 533 534

  TFileHeader tfh = {0};
  tfh.suid = suid;
  tfh.version = version;
  memcpy(tfh.colName, colName, strlen(colName));
  tfh.colType = colType;

  return tfileWriterCreate(wcx, &tfh);
}
dengyihao's avatar
dengyihao 已提交
535
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) {
dengyihao's avatar
dengyihao 已提交
536
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
537 538
  tfileGenFileFullName(fullname, path, suid, colName, version);

dengyihao's avatar
dengyihao 已提交
539
  WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
dengyihao's avatar
dengyihao 已提交
540
  if (wc == NULL) {
541 542
    terrno = TAOS_SYSTEM_ERROR(errno);
    indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
dengyihao's avatar
dengyihao 已提交
543 544
    return NULL;
  }
545
  indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
dengyihao's avatar
dengyihao 已提交
546

dengyihao's avatar
dengyihao 已提交
547
  TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
548 549
  return reader;
}
dengyihao's avatar
dengyihao 已提交
550
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
wafwerar's avatar
wafwerar 已提交
551
  TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
dengyihao's avatar
dengyihao 已提交
552
  if (tw == NULL) {
dengyihao's avatar
dengyihao 已提交
553
    indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
dengyihao's avatar
dengyihao 已提交
554 555
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
556 557
  tw->ctx = ctx;
  tw->header = *header;
dengyihao's avatar
dengyihao 已提交
558
  tfileWriteHeader(tw);
dengyihao's avatar
dengyihao 已提交
559 560
  return tw;
}
dengyihao's avatar
dengyihao 已提交
561

dengyihao's avatar
dengyihao 已提交
562
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
dengyihao's avatar
dengyihao 已提交
563
  // sort by coltype and write to tindex
dengyihao's avatar
dengyihao 已提交
564 565
  if (order == false) {
    __compar_fn_t fn;
dengyihao's avatar
dengyihao 已提交
566 567

    int8_t colType = tw->header.colType;
dengyihao's avatar
add UT  
dengyihao 已提交
568
    colType = INDEX_TYPE_GET_TYPE(colType);
dengyihao's avatar
dengyihao 已提交
569 570 571 572 573 574
    if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
      fn = tfileStrCompare;
    } else {
      fn = getComparFunc(colType, 0);
    }
    taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
dengyihao's avatar
dengyihao 已提交
575
  }
dengyihao's avatar
dengyihao 已提交
576

dengyihao's avatar
dengyihao 已提交
577
  int32_t bufLimit = 64 * 4096, offset = 0;
wafwerar's avatar
wafwerar 已提交
578
  // char*   buf = taosMemoryCalloc(1, sizeof(char) * bufLimit);
dengyihao's avatar
dengyihao 已提交
579
  // char*   p = buf;
dengyihao's avatar
dengyihao 已提交
580
  int32_t sz = taosArrayGetSize((SArray*)data);
dengyihao's avatar
dengyihao 已提交
581 582 583 584 585
  int32_t fstOffset = tw->offset;

  // ugly code, refactor later
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
586 587
    taosArraySort(v->tableId, tfileUidCompare);
    taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
588
    int32_t tbsz = taosArrayGetSize(v->tableId);
dengyihao's avatar
dengyihao 已提交
589
    fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
dengyihao's avatar
dengyihao 已提交
590 591 592
  }
  tfileWriteFstOffset(tw, fstOffset);

dengyihao's avatar
dengyihao 已提交
593 594 595 596 597 598 599
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
    // check buf has enough space or not
    int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);

wafwerar's avatar
wafwerar 已提交
600
    char* buf = taosMemoryCalloc(1, ttsz * sizeof(char));
dengyihao's avatar
dengyihao 已提交
601
    char* p = buf;
dengyihao's avatar
dengyihao 已提交
602
    tfileSerialTableIdsToBuf(p, v->tableId);
dengyihao's avatar
dengyihao 已提交
603
    tw->ctx->write(tw->ctx, buf, ttsz);
dengyihao's avatar
dengyihao 已提交
604 605
    v->offset = tw->offset;
    tw->offset += ttsz;
wafwerar's avatar
wafwerar 已提交
606
    taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
607
  }
dengyihao's avatar
dengyihao 已提交
608

dengyihao's avatar
dengyihao 已提交
609 610
  tw->fb = fstBuilderCreate(tw->ctx, 0);
  if (tw->fb == NULL) {
dengyihao's avatar
dengyihao 已提交
611
    tfileWriterClose(tw);
dengyihao's avatar
dengyihao 已提交
612 613
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
614 615

  // write data
dengyihao's avatar
dengyihao 已提交
616 617 618
  for (size_t i = 0; i < sz; i++) {
    // TODO, fst batch write later
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
619 620 621 622
    if (tfileWriteData(tw, v) != 0) {
      indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
                 (int)taosArrayGetSize(v->tableId));
    } else {
dengyihao's avatar
dengyihao 已提交
623 624
      // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
      //          (int)taosArrayGetSize(v->tableId));
dengyihao's avatar
dengyihao 已提交
625 626

      // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
dengyihao's avatar
dengyihao 已提交
627 628
    }
  }
629

dengyihao's avatar
dengyihao 已提交
630 631 632
  fstBuilderFinish(tw->fb);
  fstBuilderDestroy(tw->fb);
  tw->fb = NULL;
dengyihao's avatar
dengyihao 已提交
633 634

  tfileWriteFooter(tw);
dengyihao's avatar
dengyihao 已提交
635 636
  return 0;
}
dengyihao's avatar
dengyihao 已提交
637
void tfileWriterClose(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
638 639 640
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
641
  writerCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
642
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
643
}
dengyihao's avatar
dengyihao 已提交
644
void tfileWriterDestroy(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
645 646 647
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
648
  writerCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
649
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
650
}
dengyihao's avatar
dengyihao 已提交
651

dengyihao's avatar
dengyihao 已提交
652
IndexTFile* indexTFileCreate(const char* path) {
dengyihao's avatar
dengyihao 已提交
653
  TFileCache* cache = tfileCacheCreate(path);
dengyihao's avatar
dengyihao 已提交
654 655 656
  if (cache == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
657

wafwerar's avatar
wafwerar 已提交
658
  IndexTFile* tfile = taosMemoryCalloc(1, sizeof(IndexTFile));
dengyihao's avatar
dengyihao 已提交
659 660 661 662
  if (tfile == NULL) {
    tfileCacheDestroy(cache);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
663
  taosThreadMutexInit(&tfile->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
664
  tfile->cache = cache;
dengyihao's avatar
dengyihao 已提交
665 666
  return tfile;
}
667
void indexTFileDestroy(IndexTFile* tfile) {
dengyihao's avatar
dengyihao 已提交
668 669 670
  if (tfile == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
671
  taosThreadMutexDestroy(&tfile->mtx);
dengyihao's avatar
dengyihao 已提交
672
  tfileCacheDestroy(tfile->cache);
wafwerar's avatar
wafwerar 已提交
673
  taosMemoryFree(tfile);
dengyihao's avatar
dengyihao 已提交
674
}
dengyihao's avatar
dengyihao 已提交
675

dengyihao's avatar
dengyihao 已提交
676
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) {
dengyihao's avatar
dengyihao 已提交
677
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
678 679 680
  if (tfile == NULL) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
681

dengyihao's avatar
add UT  
dengyihao 已提交
682
  int64_t     st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
683
  IndexTFile* pTfile = tfile;
684

dengyihao's avatar
dengyihao 已提交
685 686
  SIndexTerm* term = query->term;
  ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
dengyihao's avatar
dengyihao 已提交
687 688

  taosThreadMutexLock(&pTfile->mtx);
689
  TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
dengyihao's avatar
dengyihao 已提交
690
  taosThreadMutexUnlock(&pTfile->mtx);
dengyihao's avatar
dengyihao 已提交
691 692 693
  if (reader == NULL) {
    return 0;
  }
dengyihao's avatar
add UT  
dengyihao 已提交
694 695
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("index tfile stage 1 cost: %" PRId64 "", cost);
dengyihao's avatar
dengyihao 已提交
696

dengyihao's avatar
dengyihao 已提交
697
  return tfileReaderSearch(reader, query, result);
dengyihao's avatar
dengyihao 已提交
698
}
dengyihao's avatar
dengyihao 已提交
699
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
700 701
  // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
  // term->nColName, .version = 1};
dengyihao's avatar
dengyihao 已提交
702

703 704
  return 0;
}
dengyihao's avatar
dengyihao 已提交
705 706 707 708 709 710 711 712 713
static bool tfileIteratorNext(Iterate* iiter) {
  IterateValue* iv = &iiter->val;
  iterateValueDestroy(iv, false);

  char*    colVal = NULL;
  uint64_t offset = 0;

  TFileFstIter*          tIter = iiter->iter;
  StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
dengyihao's avatar
dengyihao 已提交
714 715 716
  if (rt == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
717 718 719

  int32_t sz = 0;
  char*   ch = (char*)fstSliceData(&rt->data, &sz);
wafwerar's avatar
wafwerar 已提交
720
  colVal = taosMemoryCalloc(1, sz + 1);
dengyihao's avatar
dengyihao 已提交
721 722 723 724 725
  memcpy(colVal, ch, sz);

  offset = (uint64_t)(rt->out.out);
  swsResultDestroy(rt);
  // set up iterate value
dengyihao's avatar
dengyihao 已提交
726 727 728
  if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
729

dengyihao's avatar
dengyihao 已提交
730
  iv->ver = 0;
731
  iv->type = ADD_VALUE;  // value in tfile always ADD_VALUE
dengyihao's avatar
dengyihao 已提交
732
  iv->colVal = colVal;
dengyihao's avatar
dengyihao 已提交
733
  return true;
dengyihao's avatar
dengyihao 已提交
734 735 736
  // std::string key(ch, sz);
}

737
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
dengyihao's avatar
dengyihao 已提交
738 739

static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
740 741
  TFileFstIter* iter = taosMemoryCalloc(1, sizeof(TFileFstIter));
  if (iter == NULL) {
dengyihao's avatar
dengyihao 已提交
742 743
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
744

dengyihao's avatar
dengyihao 已提交
745 746 747 748 749
  iter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
  iter->fb = fstSearch(reader->fst, iter->ctx);
  iter->st = streamBuilderIntoStream(iter->fb);
  iter->rdr = reader;
  return iter;
dengyihao's avatar
dengyihao 已提交
750 751 752
}

Iterate* tfileIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
753 754 755
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
756

wafwerar's avatar
wafwerar 已提交
757
  Iterate* iter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
758
  iter->iter = tfileFstIteratorCreate(reader);
759
  if (iter->iter == NULL) {
wafwerar's avatar
wafwerar 已提交
760
    taosMemoryFree(iter);
761 762
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
763 764
  iter->next = tfileIteratorNext;
  iter->getValue = tifileIterateGetValue;
dengyihao's avatar
dengyihao 已提交
765
  iter->val.val = taosArrayInit(1, sizeof(uint64_t));
766
  iter->val.colVal = NULL;
dengyihao's avatar
dengyihao 已提交
767 768 769
  return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
770 771 772
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
773

dengyihao's avatar
dengyihao 已提交
774 775 776 777 778 779 780
  IterateValue* iv = &iter->val;
  iterateValueDestroy(iv, true);

  TFileFstIter* tIter = iter->iter;
  streamWithStateDestroy(tIter->st);
  fstStreamBuilderDestroy(tIter->fb);
  automCtxDestroy(tIter->ctx);
wafwerar's avatar
wafwerar 已提交
781
  taosMemoryFree(tIter);
dengyihao's avatar
dengyihao 已提交
782

wafwerar's avatar
wafwerar 已提交
783
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
784 785
}

dengyihao's avatar
dengyihao 已提交
786
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
dengyihao's avatar
dengyihao 已提交
787 788 789
  if (tf == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
790 791 792 793 794 795 796
  TFileReader* rd = NULL;
  ICacheKey    key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};

  taosThreadMutexLock(&tf->mtx);
  rd = tfileCacheGet(tf->cache, &key);
  taosThreadMutexUnlock(&tf->mtx);
  return rd;
dengyihao's avatar
dengyihao 已提交
797
}
dengyihao's avatar
dengyihao 已提交
798

dengyihao's avatar
dengyihao 已提交
799 800 801 802 803
static int tfileUidCompare(const void* a, const void* b) {
  uint64_t l = *(uint64_t*)a;
  uint64_t r = *(uint64_t*)b;
  return l - r;
}
dengyihao's avatar
dengyihao 已提交
804 805
static int tfileStrCompare(const void* a, const void* b) {
  int ret = strcmp((char*)a, (char*)b);
dengyihao's avatar
dengyihao 已提交
806 807 808
  if (ret == 0) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
809 810 811
  return ret < 0 ? -1 : 1;
}

dengyihao's avatar
dengyihao 已提交
812 813 814 815 816 817 818 819
static int tfileValueCompare(const void* a, const void* b, const void* param) {
  __compar_fn_t fn = *(__compar_fn_t*)param;

  TFileValue* av = (TFileValue*)a;
  TFileValue* bv = (TFileValue*)b;

  return fn(av->colVal, bv->colVal);
}
dengyihao's avatar
dengyihao 已提交
820 821

TFileValue* tfileValueCreate(char* val) {
wafwerar's avatar
wafwerar 已提交
822
  TFileValue* tf = taosMemoryCalloc(1, sizeof(TFileValue));
dengyihao's avatar
dengyihao 已提交
823 824 825
  if (tf == NULL) {
    return NULL;
  }
826
  tf->colVal = tstrdup(val);
dengyihao's avatar
dengyihao 已提交
827 828 829 830
  tf->tableId = taosArrayInit(32, sizeof(uint64_t));
  return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
dengyihao's avatar
dengyihao 已提交
831 832 833
  if (tf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
834 835 836 837 838
  taosArrayPush(tf->tableId, &val);
  return 0;
}
void tfileValueDestroy(TFileValue* tf) {
  taosArrayDestroy(tf->tableId);
wafwerar's avatar
wafwerar 已提交
839 840
  taosMemoryFree(tf->colVal);
  taosMemoryFree(tf);
dengyihao's avatar
dengyihao 已提交
841
}
dengyihao's avatar
dengyihao 已提交
842 843 844 845 846
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
  int sz = taosArrayGetSize(ids);
  SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
  for (size_t i = 0; i < sz; i++) {
    uint64_t* v = taosArrayGet(ids, i);
dengyihao's avatar
dengyihao 已提交
847 848 849 850 851 852 853
    SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
  }
}

static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
  int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
  tw->header.fstOffset = fstOffset;
dengyihao's avatar
dengyihao 已提交
854

dengyihao's avatar
dengyihao 已提交
855 856 857
  if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
858
  indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
dengyihao's avatar
dengyihao 已提交
859
  tw->offset += sizeof(fstOffset);
dengyihao's avatar
dengyihao 已提交
860 861 862
  return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
dengyihao's avatar
dengyihao 已提交
863
  char buf[TFILE_HEADER_NO_FST] = {0};
dengyihao's avatar
dengyihao 已提交
864 865 866 867

  TFileHeader* header = &writer->header;
  memcpy(buf, (char*)header, sizeof(buf));

dengyihao's avatar
dengyihao 已提交
868
  indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
869
  int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
870 871 872
  if (sizeof(buf) != nwrite) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
873 874

  indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
875 876 877 878 879 880
  writer->offset = nwrite;
  return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
  TFileHeader* header = &write->header;
  uint8_t      colType = header->colType;
dengyihao's avatar
add UT  
dengyihao 已提交
881 882

  colType = INDEX_TYPE_GET_TYPE(colType);
883 884
  FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
  if (fstBuilderInsert(write->fb, key, tval->offset)) {
dengyihao's avatar
dengyihao 已提交
885
    fstSliceDestroy(&key);
886
    return 0;
dengyihao's avatar
dengyihao 已提交
887
  }
888 889 890 891 892 893 894 895 896 897 898 899 900
  return -1;

  // if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
  //  FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
  //  if (fstBuilderInsert(write->fb, key, tval->offset)) {
  //    fstSliceDestroy(&key);
  //    return 0;
  //  }
  //  fstSliceDestroy(&key);
  //  return -1;
  //} else {
  //  // handle other type later
  //}
dengyihao's avatar
dengyihao 已提交
901
}
dengyihao's avatar
dengyihao 已提交
902 903 904 905
static int tfileWriteFooter(TFileWriter* write) {
  char  buf[sizeof(tfileMagicNumber) + 1] = {0};
  void* pBuf = (void*)buf;
  taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
dengyihao's avatar
dengyihao 已提交
906
  int nwrite = write->ctx->write(write->ctx, buf, (int32_t)strlen(buf));
dengyihao's avatar
dengyihao 已提交
907 908

  indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
dengyihao's avatar
dengyihao 已提交
909 910 911
  assert(nwrite == sizeof(tfileMagicNumber));
  return nwrite;
}
dengyihao's avatar
dengyihao 已提交
912
static int tfileReaderLoadHeader(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
913
  // TODO simple tfile header later
dengyihao's avatar
dengyihao 已提交
914
  char buf[TFILE_HEADER_SIZE] = {0};
dengyihao's avatar
dengyihao 已提交
915

dengyihao's avatar
dengyihao 已提交
916
  int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
dengyihao's avatar
dengyihao 已提交
917
  if (nread == -1) {
dengyihao's avatar
dengyihao 已提交
918 919
    indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
               reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
920
  } else {
dengyihao's avatar
dengyihao 已提交
921
    indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
922 923
  }
  // assert(nread == sizeof(buf));
dengyihao's avatar
dengyihao 已提交
924
  memcpy(&reader->header, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
925

dengyihao's avatar
dengyihao 已提交
926 927
  return 0;
}
dengyihao's avatar
dengyihao 已提交
928
static int tfileReaderLoadFst(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
929 930
  WriterCtx* ctx = reader->ctx;
  int        size = ctx->size(ctx);
dengyihao's avatar
dengyihao 已提交
931

dengyihao's avatar
dengyihao 已提交
932 933
  // current load fst into memory, refactor it later
  int   fstSize = size - reader->header.fstOffset - sizeof(tfileMagicNumber);
wafwerar's avatar
wafwerar 已提交
934
  char* buf = taosMemoryCalloc(1, fstSize);
dengyihao's avatar
dengyihao 已提交
935 936 937
  if (buf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
938

dengyihao's avatar
dengyihao 已提交
939
  int64_t ts = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
940
  int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
dengyihao's avatar
dengyihao 已提交
941
  int64_t cost = taosGetTimestampUs() - ts;
dengyihao's avatar
dengyihao 已提交
942
  indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %" PRId64 "us", nread,
dengyihao's avatar
dengyihao 已提交
943
            reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
dengyihao's avatar
dengyihao 已提交
944
  // we assuse fst size less than FST_MAX_SIZE
dengyihao's avatar
dengyihao 已提交
945
  assert(nread > 0 && nread <= fstSize);
dengyihao's avatar
dengyihao 已提交
946 947 948

  FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
  reader->fst = fstCreate(&st);
wafwerar's avatar
wafwerar 已提交
949
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
950 951
  fstSliceDestroy(&st);

dengyihao's avatar
dengyihao 已提交
952
  return reader->fst != NULL ? 0 : -1;
dengyihao's avatar
dengyihao 已提交
953
}
dengyihao's avatar
dengyihao 已提交
954
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
dengyihao's avatar
dengyihao 已提交
955
  // TODO(yihao): opt later
dengyihao's avatar
dengyihao 已提交
956
  WriterCtx* ctx = reader->ctx;
957
  // add block cache
dengyihao's avatar
dengyihao 已提交
958
  char    block[4096] = {0};
959
  int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
dengyihao's avatar
dengyihao 已提交
960 961 962 963 964 965 966 967 968 969 970 971 972 973
  assert(nread >= sizeof(uint32_t));

  char*   p = block;
  int32_t nid = *(int32_t*)p;
  p += sizeof(nid);

  while (nid > 0) {
    int32_t left = block + sizeof(block) - p;
    if (left >= sizeof(uint64_t)) {
      taosArrayPush(result, (uint64_t*)p);
      p += sizeof(uint64_t);
    } else {
      char buf[sizeof(uint64_t)] = {0};
      memcpy(buf, p, left);
dengyihao's avatar
dengyihao 已提交
974

dengyihao's avatar
dengyihao 已提交
975 976 977 978
      memset(block, 0, sizeof(block));
      offset += sizeof(block);
      nread = ctx->readFrom(ctx, block, sizeof(block), offset);
      memcpy(buf + left, block, sizeof(uint64_t) - left);
dengyihao's avatar
dengyihao 已提交
979

dengyihao's avatar
dengyihao 已提交
980 981 982 983
      taosArrayPush(result, (uint64_t*)buf);
      p = block + sizeof(uint64_t) - left;
    }
    nid -= 1;
dengyihao's avatar
dengyihao 已提交
984
  }
dengyihao's avatar
dengyihao 已提交
985 986
  return 0;
}
dengyihao's avatar
dengyihao 已提交
987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
static int tfileReaderVerify(TFileReader* reader) {
  // just validate header and Footer, file corrupted also shuild be verified later
  WriterCtx* ctx = reader->ctx;

  uint64_t tMagicNumber = 0;

  char buf[sizeof(tMagicNumber) + 1] = {0};
  int  size = ctx->size(ctx);

  if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
    return -1;
  } else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
    return -1;
  }

  taosDecodeFixedU64(buf, &tMagicNumber);
  return tMagicNumber == tfileMagicNumber ? 0 : -1;
}

dengyihao's avatar
dengyihao 已提交
1006
void tfileReaderRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
1007 1008 1009
  if (reader == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1010 1011 1012 1013
  int ref = T_REF_INC(reader);
  UNUSED(ref);
}

dengyihao's avatar
dengyihao 已提交
1014
void tfileReaderUnRef(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
1015 1016 1017
  if (reader == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1018
  int ref = T_REF_DEC(reader);
1019
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
1020
    // do nothing
1021 1022
    tfileReaderDestroy(reader);
  }
dengyihao's avatar
dengyihao 已提交
1023
}
dengyihao's avatar
dengyihao 已提交
1024

dengyihao's avatar
dengyihao 已提交
1025
static SArray* tfileGetFileList(const char* path) {
dengyihao's avatar
dengyihao 已提交
1026 1027
  char     buf[128] = {0};
  uint64_t suid;
dengyihao's avatar
dengyihao 已提交
1028
  int64_t  version;
dengyihao's avatar
dengyihao 已提交
1029
  SArray*  files = taosArrayInit(4, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
1030

wafwerar's avatar
wafwerar 已提交
1031 1032
  TdDirPtr pDir = taosOpenDir(path);
  if (NULL == pDir) {
dengyihao's avatar
dengyihao 已提交
1033 1034
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
1035 1036 1037
  TdDirEntryPtr pDirEntry;
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char* file = taosGetDirEntryName(pDirEntry);
dengyihao's avatar
dengyihao 已提交
1038 1039 1040
    if (0 != tfileParseFileName(file, &suid, buf, &version)) {
      continue;
    }
dengyihao's avatar
dengyihao 已提交
1041 1042

    size_t len = strlen(path) + 1 + strlen(file) + 1;
wafwerar's avatar
wafwerar 已提交
1043
    char*  buf = taosMemoryCalloc(1, len);
dengyihao's avatar
dengyihao 已提交
1044
    sprintf(buf, "%s/%s", path, file);
dengyihao's avatar
dengyihao 已提交
1045
    taosArrayPush(files, &buf);
dengyihao's avatar
dengyihao 已提交
1046
  }
wafwerar's avatar
wafwerar 已提交
1047
  taosCloseDir(&pDir);
dengyihao's avatar
dengyihao 已提交
1048 1049 1050 1051 1052

  taosArraySort(files, tfileCompare);
  tfileRmExpireFile(files);

  return files;
dengyihao's avatar
dengyihao 已提交
1053
}
dengyihao's avatar
dengyihao 已提交
1054 1055 1056 1057
static int tfileRmExpireFile(SArray* result) {
  // TODO(yihao): remove expire tindex after restart
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1058 1059
static void tfileDestroyFileName(void* elem) {
  char* p = *(char**)elem;
wafwerar's avatar
wafwerar 已提交
1060
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
1061 1062
}
static int tfileCompare(const void* a, const void* b) {
dengyihao's avatar
dengyihao 已提交
1063 1064 1065
  const char* as = *(char**)a;
  const char* bs = *(char**)b;
  return strcmp(as, bs);
dengyihao's avatar
dengyihao 已提交
1066
}
dengyihao's avatar
dengyihao 已提交
1067

dengyihao's avatar
dengyihao 已提交
1068 1069
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) {
dengyihao's avatar
dengyihao 已提交
1070 1071 1072 1073 1074
    // read suid & colid & version  success
    return 0;
  }
  return -1;
}
dengyihao's avatar
dengyihao 已提交
1075
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
1076 1077
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) {
  sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
dengyihao's avatar
dengyihao 已提交
1078 1079
  return;
}
dengyihao's avatar
dengyihao 已提交
1080
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
dengyihao's avatar
dengyihao 已提交
1081 1082 1083 1084
  char filename[128] = {0};
  tfileGenFileName(filename, suid, col, version);
  sprintf(fullname, "%s/%s", path, filename);
}