indexTfile.c 32.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
15
#include "indexTfile.h"
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexComm.h"
#include "indexFst.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
20
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
21
#include "taosdef.h"
22
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tcoding.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
25

dengyihao's avatar
dengyihao 已提交
26
const static uint64_t FILE_MAGIC_NUMBER = 0xdb4775248b80fb57ull;
dengyihao's avatar
dengyihao 已提交
27

dengyihao's avatar
dengyihao 已提交
28
typedef struct TFileFstIter {
dengyihao's avatar
dengyihao 已提交
29 30 31 32
  FStmBuilder* fb;
  FStmSt*      st;
  FAutoCtx*    ctx;
  TFileReader* rdr;
dengyihao's avatar
dengyihao 已提交
33 34
} TFileFstIter;

dengyihao's avatar
dengyihao 已提交
35 36
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))

dengyihao's avatar
dengyihao 已提交
37
static int  tfileStrCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
38 39
static int  tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41
static int tfileWriteHeader(TFileWriter* writer);
dengyihao's avatar
dengyihao 已提交
42
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
dengyihao's avatar
dengyihao 已提交
43
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
dengyihao's avatar
dengyihao 已提交
44
static int tfileWriteFooter(TFileWriter* write);
dengyihao's avatar
dengyihao 已提交
45

dengyihao's avatar
dengyihao 已提交
46
// handle file corrupt later
dengyihao's avatar
dengyihao 已提交
47 48
static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
49
static int tfileReaderVerify(TFileReader* reader);
dengyihao's avatar
dengyihao 已提交
50
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
dengyihao's avatar
dengyihao 已提交
51

dengyihao's avatar
dengyihao 已提交
52 53 54 55
static SArray* tfileGetFileList(const char* path);
static int     tfileRmExpireFile(SArray* result);
static void    tfileDestroyFileName(void* elem);
static int     tfileCompare(const void* a, const void* b);
dengyihao's avatar
dengyihao 已提交
56 57 58
static int     tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version);
static void    tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version);
static void    tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version);
dengyihao's avatar
dengyihao 已提交
59 60 61
/*
 * search from  tfile
 */
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr);

static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);

static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
dengyihao's avatar
dengyihao 已提交
75
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
dengyihao's avatar
dengyihao 已提交
76 77 78 79 80 81 82 83 84 85 86 87
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);

static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);

static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = {
88 89
    {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
     tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
dengyihao's avatar
dengyihao 已提交
90
    {tfSearchEqual_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
91
     tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93
TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
wafwerar's avatar
wafwerar 已提交
94
  TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
dengyihao's avatar
dengyihao 已提交
95 96 97
  if (tcache == NULL) {
    return NULL;
  }
98 99 100 101

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

dengyihao's avatar
dengyihao 已提交
102
  SArray* files = tfileGetFileList(path);
dengyihao's avatar
dengyihao 已提交
103
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
dengyihao's avatar
dengyihao 已提交
104
    char* file = taosArrayGetP(files, i);
dengyihao's avatar
dengyihao 已提交
105

dengyihao's avatar
dengyihao 已提交
106 107
    IFileCtx* ctx = idxFileCtxCreate(TFILE, file, true, 1024 * 1024 * 64);
    if (ctx == NULL) {
dengyihao's avatar
dengyihao 已提交
108
      indexError("failed to open index:%s", file);
109
      goto End;
dengyihao's avatar
dengyihao 已提交
110
    }
dengyihao's avatar
dengyihao 已提交
111
    ctx->lru = idx->lru;
dengyihao's avatar
dengyihao 已提交
112

dengyihao's avatar
dengyihao 已提交
113
    TFileReader* reader = tfileReaderCreate(ctx);
dengyihao's avatar
dengyihao 已提交
114 115 116 117
    if (reader == NULL) {
      indexInfo("skip invalid file: %s", file);
      continue;
    }
dengyihao's avatar
dengyihao 已提交
118 119
    reader->lru = idx->lru;

dengyihao's avatar
dengyihao 已提交
120
    TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
121
    ICacheKey    key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123
    char    buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
124
    int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
125 126
    assert(sz < sizeof(buf));
    taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
127
    tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
128
  }
dengyihao's avatar
dengyihao 已提交
129
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
130
  return tcache;
dengyihao's avatar
dengyihao 已提交
131
End:
132
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
133
  taosArrayDestroyEx(files, tfileDestroyFileName);
134
  return NULL;
dengyihao's avatar
dengyihao 已提交
135
}
dengyihao's avatar
dengyihao 已提交
136
void tfileCacheDestroy(TFileCache* tcache) {
dengyihao's avatar
dengyihao 已提交
137 138 139
  if (tcache == NULL) {
    return;
  }
140
  // free table cache
dengyihao's avatar
dengyihao 已提交
141
  TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
142
  while (reader) {
dengyihao's avatar
dengyihao 已提交
143
    TFileReader* p = *reader;
S
Shengliang Guan 已提交
144
    indexInfo("drop table cache suid:%" PRIu64 ", colName:%s, colType:%d", p->header.suid, p->header.colName,
145
              p->header.colType);
dengyihao's avatar
dengyihao 已提交
146
    tfileReaderUnRef(p);
147 148 149
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
wafwerar's avatar
wafwerar 已提交
150
  taosMemoryFree(tcache);
dengyihao's avatar
dengyihao 已提交
151 152
}

dengyihao's avatar
dengyihao 已提交
153 154
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
  char    buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
155
  int32_t sz = idxSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
156 157
  assert(sz < sizeof(buf));
  TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
158
  if (reader == NULL || *reader == NULL) {
dengyihao's avatar
dengyihao 已提交
159 160
    return NULL;
  }
161
  tfileReaderRef(*reader);
dengyihao's avatar
dengyihao 已提交
162

163
  return *reader;
dengyihao's avatar
dengyihao 已提交
164
}
dengyihao's avatar
dengyihao 已提交
165
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
166 167
  char          buf[128] = {0};
  int32_t       sz = idxSerialCacheKey(key, buf);
dengyihao's avatar
dengyihao 已提交
168
  TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
169
  if (p != NULL && *p != NULL) {
dengyihao's avatar
dengyihao 已提交
170
    TFileReader* oldRdr = *p;
dengyihao's avatar
dengyihao 已提交
171
    taosHashRemove(tcache->tableCache, buf, sz);
dengyihao's avatar
dengyihao 已提交
172 173 174
    indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
    oldRdr->remove = true;
    tfileReaderUnRef(oldRdr);
dengyihao's avatar
dengyihao 已提交
175
  }
dengyihao's avatar
dengyihao 已提交
176
  taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
177
  tfileReaderRef(reader);
dengyihao's avatar
dengyihao 已提交
178
  return;
179
}
dengyihao's avatar
dengyihao 已提交
180
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
wafwerar's avatar
wafwerar 已提交
181
  TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
dengyihao's avatar
dengyihao 已提交
182 183 184
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
185
  reader->ctx = ctx;
dengyihao's avatar
dengyihao 已提交
186
  reader->remove = false;
dengyihao's avatar
dengyihao 已提交
187 188

  if (0 != tfileReaderVerify(reader)) {
S
Shengliang Guan 已提交
189
    indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName);
dengyihao's avatar
dengyihao 已提交
190
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
191 192
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
193

dengyihao's avatar
dengyihao 已提交
194
  if (0 != tfileReaderLoadHeader(reader)) {
S
Shengliang Guan 已提交
195
    indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid,
196
               reader->header.colName);
dengyihao's avatar
dengyihao 已提交
197
    tfileReaderDestroy(reader);
dengyihao's avatar
dengyihao 已提交
198 199 200 201
    return NULL;
  }

  if (0 != tfileReaderLoadFst(reader)) {
S
Shengliang Guan 已提交
202
    indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid,
dengyihao's avatar
dengyihao 已提交
203
               reader->header.colName, errno);
dengyihao's avatar
dengyihao 已提交
204 205 206 207
    tfileReaderDestroy(reader);
    return NULL;
  }

208
  return reader;
dengyihao's avatar
dengyihao 已提交
209
}
dengyihao's avatar
dengyihao 已提交
210
void tfileReaderDestroy(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
211 212 213
  if (reader == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
214
  fstDestroy(reader->fst);
dengyihao's avatar
dengyihao 已提交
215 216 217 218 219
  if (reader->remove) {
    indexInfo("%s is removed", reader->ctx->file.buf);
  } else {
    indexInfo("%s is not removed", reader->ctx->file.buf);
  }
dengyihao's avatar
dengyihao 已提交
220
  idxFileCtxDestroy(reader->ctx, reader->remove);
dengyihao's avatar
dengyihao 已提交
221

wafwerar's avatar
wafwerar 已提交
222
  taosMemoryFree(reader);
dengyihao's avatar
dengyihao 已提交
223
}
dengyihao's avatar
dengyihao 已提交
224

dengyihao's avatar
dengyihao 已提交
225
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
226 227 228
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
229

dengyihao's avatar
dengyihao 已提交
230 231 232 233 234 235 236 237
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);
dengyihao's avatar
dengyihao 已提交
238

dengyihao's avatar
dengyihao 已提交
239
    ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
dengyihao's avatar
dengyihao 已提交
240 241 242 243 244 245 246
    cost = taosGetTimestampUs() - et;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
              tem->colName, tem->colVal, cost);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
247

dengyihao's avatar
dengyihao 已提交
248
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
249 250 251 252 253
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

dengyihao's avatar
dengyihao 已提交
254 255 256 257 258
  FAutoCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
  FStmSt*      st = stmBuilderIntoStm(sb);
  FStmStRslt*  rt = NULL;
  while ((rt = stmStNextWith(st, NULL)) != NULL) {
dengyihao's avatar
dengyihao 已提交
259 260 261
    taosArrayPush(offsets, &(rt->out.out));
    swsResultDestroy(rt);
  }
dengyihao's avatar
dengyihao 已提交
262 263
  stmStDestroy(st);
  stmBuilderDestroy(sb);
dengyihao's avatar
dengyihao 已提交
264 265 266 267 268 269 270 271

  int32_t ret = 0;
  for (int i = 0; i < taosArrayGetSize(offsets); i++) {
    uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    if (ret != 0) {
      indexError("failed to find target tablelist");
      return TSDB_CODE_TDB_FILE_CORRUPTED;
dengyihao's avatar
add UT  
dengyihao 已提交
272
    }
dengyihao's avatar
dengyihao 已提交
273
  }
dengyihao's avatar
dengyihao 已提交
274 275
  return 0;
}
dengyihao's avatar
dengyihao 已提交
276
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
277 278 279 280 281 282 283 284
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
285
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
286
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
287 288 289 290 291

  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  if (hasJson) {
dengyihao's avatar
dengyihao 已提交
292
    p = idxPackJsonData(tem);
dengyihao's avatar
dengyihao 已提交
293 294 295 296 297 298 299 300 301 302 303 304
    sz = strlen(p);
  }
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  /*impl later*/

  if (hasJson) {
    taosMemoryFree(p);
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
305

dengyihao's avatar
dengyihao 已提交
306
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType type) {
307 308 309
  int                  ret = 0;
  char*                p = tem->colVal;
  int                  skip = 0;
dengyihao's avatar
dengyihao 已提交
310
  _cache_range_compare cmpFn = idxGetCompare(type);
dengyihao's avatar
dengyihao 已提交
311

dengyihao's avatar
dengyihao 已提交
312 313
  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));

dengyihao's avatar
dengyihao 已提交
314 315
  FAutoCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
  FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
dengyihao's avatar
dengyihao 已提交
316

dengyihao's avatar
dengyihao 已提交
317
  FstSlice h = fstSliceCreate((uint8_t*)p, skip);
dengyihao's avatar
dengyihao 已提交
318
  stmBuilderSetRange(sb, &h, type);
dengyihao's avatar
dengyihao 已提交
319 320
  fstSliceDestroy(&h);

dengyihao's avatar
dengyihao 已提交
321 322 323
  FStmSt*     st = stmBuilderIntoStm(sb);
  FStmStRslt* rt = NULL;
  while ((rt = stmStNextWith(st, NULL)) != NULL) {
324 325
    FstSlice* s = &rt->data;
    char*     ch = (char*)fstSliceData(s, NULL);
326 327 328 329 330 331
    // if (0 != strncmp(ch, tem->colName, tem->nColName)) {
    //  swsResultDestroy(rt);
    //  break;
    //}

    TExeCond cond = cmpFn(ch, p, tem->colType);
332 333 334 335 336 337 338
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
dengyihao's avatar
dengyihao 已提交
339 340
    swsResultDestroy(rt);
  }
dengyihao's avatar
dengyihao 已提交
341 342
  stmStDestroy(st);
  stmBuilderDestroy(sb);
dengyihao's avatar
dengyihao 已提交
343 344
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
345
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
346
  return tfSearchCompareFunc(reader, tem, tr, LT);
dengyihao's avatar
dengyihao 已提交
347
}
dengyihao's avatar
dengyihao 已提交
348
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
349
  return tfSearchCompareFunc(reader, tem, tr, LE);
dengyihao's avatar
dengyihao 已提交
350
}
dengyihao's avatar
dengyihao 已提交
351
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
352
  return tfSearchCompareFunc(reader, tem, tr, GT);
dengyihao's avatar
dengyihao 已提交
353
}
dengyihao's avatar
dengyihao 已提交
354
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
355
  return tfSearchCompareFunc(reader, tem, tr, GE);
dengyihao's avatar
dengyihao 已提交
356
}
dengyihao's avatar
dengyihao 已提交
357
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
358 359 360 361 362 363 364 365
  int      ret = 0;
  char*    p = tem->colVal;
  uint64_t sz = tem->nColVal;
  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
366
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
367
  int   ret = 0;
dengyihao's avatar
dengyihao 已提交
368
  char* p = idxPackJsonData(tem);
369 370 371 372 373 374 375 376 377 378 379 380 381
  int   sz = strlen(p);

  int64_t  st = taosGetTimestampUs();
  FstSlice key = fstSliceCreate(p, sz);
  uint64_t offset;
  if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
    int64_t et = taosGetTimestampUs();
    int64_t cost = et - st;
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, cost);

    ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
    cost = taosGetTimestampUs() - et;
dengyihao's avatar
dengyihao 已提交
382 383 384
    indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
              ", size: %d, time cost: %" PRIu64 "us",
              tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
385 386 387 388
  }
  fstSliceDestroy(&key);
  return 0;
}
dengyihao's avatar
dengyihao 已提交
389 390
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
  return tfSearchCompareFunc_JSON(reader, tem, tr, EQ);
391
}
dengyihao's avatar
dengyihao 已提交
392
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
393
  return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS);
394
}
dengyihao's avatar
dengyihao 已提交
395
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
396 397 398
  // impl later
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
399
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
400 401 402
  // impl later
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
403
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
404 405
  return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
}
dengyihao's avatar
dengyihao 已提交
406
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
407 408
  return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
}
dengyihao's avatar
dengyihao 已提交
409
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
410 411
  return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
}
dengyihao's avatar
dengyihao 已提交
412
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
413 414
  return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
}
dengyihao's avatar
dengyihao 已提交
415
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
416 417 418 419
  // impl later
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
420
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype) {
421 422 423
  int ret = 0;
  int skip = 0;

dengyihao's avatar
dengyihao 已提交
424 425 426 427 428 429 430
  char* p = NULL;
  if (ctype == CONTAINS) {
    SIndexTerm tm = {.suid = tem->suid,
                     .operType = tem->operType,
                     .colType = tem->colType,
                     .colName = tem->colVal,
                     .nColName = tem->nColVal};
dengyihao's avatar
dengyihao 已提交
431
    p = idxPackJsonDataPrefixNoType(&tm, &skip);
dengyihao's avatar
dengyihao 已提交
432
  } else {
dengyihao's avatar
dengyihao 已提交
433
    p = idxPackJsonDataPrefix(tem, &skip);
dengyihao's avatar
dengyihao 已提交
434
  }
435

dengyihao's avatar
dengyihao 已提交
436
  _cache_range_compare cmpFn = idxGetCompare(ctype);
437 438

  SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
dengyihao's avatar
dengyihao 已提交
439

dengyihao's avatar
dengyihao 已提交
440 441
  FAutoCtx*    ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
  FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
442

dengyihao's avatar
dengyihao 已提交
443 444 445
  FStmSt*     st = stmBuilderIntoStm(sb);
  FStmStRslt* rt = NULL;
  while ((rt = stmStNextWith(st, NULL)) != NULL) {
446
    FstSlice* s = &rt->data;
447

dengyihao's avatar
dengyihao 已提交
448 449 450 451
    int32_t  sz = 0;
    char*    ch = (char*)fstSliceData(s, &sz);
    TExeCond cond = CONTINUE;
    if (ctype == CONTAINS) {
dengyihao's avatar
dengyihao 已提交
452
      if (0 == strncmp(ch, p, skip)) {
dengyihao's avatar
dengyihao 已提交
453 454 455
        cond = MATCH;
      }
    } else {
dengyihao's avatar
dengyihao 已提交
456
      if (0 != strncmp(ch, p, skip - 1)) {
dengyihao's avatar
dengyihao 已提交
457 458
        swsResultDestroy(rt);
        break;
dengyihao's avatar
dengyihao 已提交
459 460
      } else if (0 != strncmp(ch, p, skip)) {
        continue;
dengyihao's avatar
dengyihao 已提交
461
      }
dengyihao's avatar
dengyihao 已提交
462 463 464 465
      char* tBuf = taosMemoryCalloc(1, sz + 1);
      memcpy(tBuf, ch, sz);
      cond = cmpFn(tBuf + skip, tem->colVal, IDX_TYPE_GET_TYPE(tem->colType));
      taosMemoryFree(tBuf);
466
    }
467 468 469 470 471 472 473 474 475
    if (MATCH == cond) {
      tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
    } else if (CONTINUE == cond) {
    } else if (BREAK == cond) {
      swsResultDestroy(rt);
      break;
    }
    swsResultDestroy(rt);
  }
dengyihao's avatar
dengyihao 已提交
476 477
  stmStDestroy(st);
  stmBuilderDestroy(sb);
478 479
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
480
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
dengyihao's avatar
dengyihao 已提交
481 482
  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
483
  int             ret = 0;
dengyihao's avatar
dengyihao 已提交
484
  if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
dengyihao's avatar
dengyihao 已提交
485
    ret = tfSearch[1][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
486
  } else {
dengyihao's avatar
dengyihao 已提交
487
    ret = tfSearch[0][qtype](reader, term, tr);
dengyihao's avatar
dengyihao 已提交
488
  }
489

dengyihao's avatar
dengyihao 已提交
490
  tfileReaderUnRef(reader);
dengyihao's avatar
dengyihao 已提交
491
  return ret;
dengyihao's avatar
dengyihao 已提交
492 493
}

dengyihao's avatar
dengyihao 已提交
494
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
dengyihao's avatar
dengyihao 已提交
495
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
496
  tfileGenFileFullName(fullname, path, suid, colName, version);
dengyihao's avatar
dengyihao 已提交
497
  IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
dengyihao's avatar
dengyihao 已提交
498 499 500
  if (wcx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
501 502 503 504 505

  TFileHeader tfh = {0};
  tfh.suid = suid;
  tfh.version = version;
  tfh.colType = colType;
dengyihao's avatar
dengyihao 已提交
506
  memcpy(tfh.colName, colName, strlen(colName));
dengyihao's avatar
dengyihao 已提交
507 508 509

  return tfileWriterCreate(wcx, &tfh);
}
dengyihao's avatar
dengyihao 已提交
510
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName) {
dengyihao's avatar
dengyihao 已提交
511
  char fullname[256] = {0};
dengyihao's avatar
dengyihao 已提交
512
  tfileGenFileFullName(fullname, idx->path, suid, colName, version);
dengyihao's avatar
dengyihao 已提交
513

dengyihao's avatar
dengyihao 已提交
514
  IFileCtx* wc = idxFileCtxCreate(TFILE, fullname, true, 1024 * 1024 * 1024);
dengyihao's avatar
dengyihao 已提交
515
  if (wc == NULL) {
516 517
    terrno = TAOS_SYSTEM_ERROR(errno);
    indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
dengyihao's avatar
dengyihao 已提交
518 519
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
520
  wc->lru = idx->lru;
dengyihao's avatar
dengyihao 已提交
521
  indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
dengyihao's avatar
dengyihao 已提交
522

dengyihao's avatar
dengyihao 已提交
523
  TFileReader* reader = tfileReaderCreate(wc);
dengyihao's avatar
dengyihao 已提交
524 525
  return reader;
}
dengyihao's avatar
dengyihao 已提交
526
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
wafwerar's avatar
wafwerar 已提交
527
  TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
dengyihao's avatar
dengyihao 已提交
528
  if (tw == NULL) {
dengyihao's avatar
dengyihao 已提交
529
    indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
dengyihao's avatar
dengyihao 已提交
530 531
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
532 533
  tw->ctx = ctx;
  tw->header = *header;
dengyihao's avatar
dengyihao 已提交
534
  tfileWriteHeader(tw);
dengyihao's avatar
dengyihao 已提交
535 536
  return tw;
}
dengyihao's avatar
dengyihao 已提交
537

dengyihao's avatar
dengyihao 已提交
538
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
dengyihao's avatar
dengyihao 已提交
539
  // sort by coltype and write to tindex
dengyihao's avatar
dengyihao 已提交
540 541
  if (order == false) {
    __compar_fn_t fn;
dengyihao's avatar
dengyihao 已提交
542 543

    int8_t colType = tw->header.colType;
dengyihao's avatar
dengyihao 已提交
544
    colType = IDX_TYPE_GET_TYPE(colType);
dengyihao's avatar
dengyihao 已提交
545 546 547 548 549 550
    if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
      fn = tfileStrCompare;
    } else {
      fn = getComparFunc(colType, 0);
    }
    taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
dengyihao's avatar
dengyihao 已提交
551
  }
dengyihao's avatar
dengyihao 已提交
552

dengyihao's avatar
dengyihao 已提交
553
  int32_t sz = taosArrayGetSize((SArray*)data);
dengyihao's avatar
dengyihao 已提交
554 555 556 557 558
  int32_t fstOffset = tw->offset;

  // ugly code, refactor later
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
559 560
    taosArraySort(v->tableId, idxUidCompare);
    taosArrayRemoveDuplicate(v->tableId, idxUidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
561
    int32_t tbsz = taosArrayGetSize(v->tableId);
dengyihao's avatar
dengyihao 已提交
562
    fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
dengyihao's avatar
dengyihao 已提交
563 564 565
  }
  tfileWriteFstOffset(tw, fstOffset);

dengyihao's avatar
dengyihao 已提交
566 567
  int32_t cap = 4 * 1024;
  char*   buf = taosMemoryCalloc(1, cap);
dengyihao's avatar
dengyihao 已提交
568

dengyihao's avatar
dengyihao 已提交
569 570 571 572 573 574 575
  for (size_t i = 0; i < sz; i++) {
    TFileValue* v = taosArrayGetP((SArray*)data, i);

    int32_t tbsz = taosArrayGetSize(v->tableId);
    // check buf has enough space or not
    int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);

dengyihao's avatar
dengyihao 已提交
576 577 578
    if (cap < ttsz) {
      cap = ttsz;
      buf = (char*)taosMemoryRealloc(buf, cap);
dengyihao's avatar
dengyihao 已提交
579
    }
dengyihao's avatar
dengyihao 已提交
580
    char* p = buf;
dengyihao's avatar
dengyihao 已提交
581
    tfileSerialTableIdsToBuf(p, v->tableId);
dengyihao's avatar
dengyihao 已提交
582
    tw->ctx->write(tw->ctx, buf, ttsz);
dengyihao's avatar
dengyihao 已提交
583 584
    v->offset = tw->offset;
    tw->offset += ttsz;
dengyihao's avatar
dengyihao 已提交
585
    memset(buf, 0, cap);
dengyihao's avatar
dengyihao 已提交
586
  }
dengyihao's avatar
dengyihao 已提交
587
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
588

dengyihao's avatar
dengyihao 已提交
589 590
  tw->fb = fstBuilderCreate(tw->ctx, 0);
  if (tw->fb == NULL) {
dengyihao's avatar
dengyihao 已提交
591
    tfileWriterClose(tw);
dengyihao's avatar
dengyihao 已提交
592 593
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
594 595

  // write data
dengyihao's avatar
dengyihao 已提交
596 597 598
  for (size_t i = 0; i < sz; i++) {
    // TODO, fst batch write later
    TFileValue* v = taosArrayGetP((SArray*)data, i);
dengyihao's avatar
dengyihao 已提交
599 600 601 602
    if (tfileWriteData(tw, v) != 0) {
      indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
                 (int)taosArrayGetSize(v->tableId));
    } else {
dengyihao's avatar
dengyihao 已提交
603 604
      indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
                (int)taosArrayGetSize(v->tableId));
dengyihao's avatar
dengyihao 已提交
605 606
    }
  }
dengyihao's avatar
dengyihao 已提交
607
  fstBuilderDestroy(tw->fb);
dengyihao's avatar
dengyihao 已提交
608
  tfileWriteFooter(tw);
dengyihao's avatar
dengyihao 已提交
609 610
  return 0;
}
dengyihao's avatar
dengyihao 已提交
611
void tfileWriterClose(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
612 613 614
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
615
  idxFileCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
616
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
617
}
dengyihao's avatar
dengyihao 已提交
618
void tfileWriterDestroy(TFileWriter* tw) {
dengyihao's avatar
dengyihao 已提交
619 620 621
  if (tw == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
622
  idxFileCtxDestroy(tw->ctx, false);
wafwerar's avatar
wafwerar 已提交
623
  taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
624
}
dengyihao's avatar
dengyihao 已提交
625

dengyihao's avatar
dengyihao 已提交
626 627
IndexTFile* idxTFileCreate(SIndex* idx, const char* path) {
  TFileCache* cache = tfileCacheCreate(idx, path);
dengyihao's avatar
dengyihao 已提交
628 629 630
  if (cache == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
631

wafwerar's avatar
wafwerar 已提交
632
  IndexTFile* tfile = taosMemoryCalloc(1, sizeof(IndexTFile));
dengyihao's avatar
dengyihao 已提交
633 634 635 636
  if (tfile == NULL) {
    tfileCacheDestroy(cache);
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
637
  taosThreadMutexInit(&tfile->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
638
  tfile->cache = cache;
dengyihao's avatar
dengyihao 已提交
639 640
  return tfile;
}
dengyihao's avatar
dengyihao 已提交
641
void idxTFileDestroy(IndexTFile* tfile) {
dengyihao's avatar
dengyihao 已提交
642 643 644
  if (tfile == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
645
  taosThreadMutexDestroy(&tfile->mtx);
dengyihao's avatar
dengyihao 已提交
646
  tfileCacheDestroy(tfile->cache);
wafwerar's avatar
wafwerar 已提交
647
  taosMemoryFree(tfile);
dengyihao's avatar
dengyihao 已提交
648
}
dengyihao's avatar
dengyihao 已提交
649

dengyihao's avatar
dengyihao 已提交
650
int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
dengyihao's avatar
dengyihao 已提交
651
  int ret = -1;
dengyihao's avatar
dengyihao 已提交
652 653 654
  if (tfile == NULL) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
655

dengyihao's avatar
add UT  
dengyihao 已提交
656
  int64_t     st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
657
  IndexTFile* pTfile = tfile;
658

dengyihao's avatar
dengyihao 已提交
659 660
  SIndexTerm* term = query->term;
  ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
dengyihao's avatar
dengyihao 已提交
661 662

  taosThreadMutexLock(&pTfile->mtx);
663
  TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
dengyihao's avatar
dengyihao 已提交
664
  taosThreadMutexUnlock(&pTfile->mtx);
dengyihao's avatar
dengyihao 已提交
665 666 667
  if (reader == NULL) {
    return 0;
  }
dengyihao's avatar
add UT  
dengyihao 已提交
668 669
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("index tfile stage 1 cost: %" PRId64 "", cost);
dengyihao's avatar
dengyihao 已提交
670

dengyihao's avatar
dengyihao 已提交
671
  return tfileReaderSearch(reader, query, result);
dengyihao's avatar
dengyihao 已提交
672
}
dengyihao's avatar
dengyihao 已提交
673
int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
674 675
  // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
  // term->nColName, .version = 1};
dengyihao's avatar
dengyihao 已提交
676

677 678
  return 0;
}
dengyihao's avatar
dengyihao 已提交
679 680 681 682 683 684 685
static bool tfileIteratorNext(Iterate* iiter) {
  IterateValue* iv = &iiter->val;
  iterateValueDestroy(iv, false);

  char*    colVal = NULL;
  uint64_t offset = 0;

dengyihao's avatar
dengyihao 已提交
686 687
  TFileFstIter* tIter = iiter->iter;
  FStmStRslt*   rt = stmStNextWith(tIter->st, NULL);
dengyihao's avatar
dengyihao 已提交
688 689 690
  if (rt == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
691 692 693

  int32_t sz = 0;
  char*   ch = (char*)fstSliceData(&rt->data, &sz);
wafwerar's avatar
wafwerar 已提交
694
  colVal = taosMemoryCalloc(1, sz + 1);
dengyihao's avatar
dengyihao 已提交
695 696 697 698 699
  memcpy(colVal, ch, sz);

  offset = (uint64_t)(rt->out.out);
  swsResultDestroy(rt);
  // set up iterate value
dengyihao's avatar
dengyihao 已提交
700 701 702
  if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
703

dengyihao's avatar
dengyihao 已提交
704
  iv->ver = 0;
705
  iv->type = ADD_VALUE;  // value in tfile always ADD_VALUE
dengyihao's avatar
dengyihao 已提交
706
  iv->colVal = colVal;
dengyihao's avatar
dengyihao 已提交
707
  return true;
dengyihao's avatar
dengyihao 已提交
708 709
}

710
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
dengyihao's avatar
dengyihao 已提交
711 712

static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
713 714
  TFileFstIter* iter = taosMemoryCalloc(1, sizeof(TFileFstIter));
  if (iter == NULL) {
dengyihao's avatar
dengyihao 已提交
715 716
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
717

dengyihao's avatar
dengyihao 已提交
718 719
  iter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
  iter->fb = fstSearch(reader->fst, iter->ctx);
dengyihao's avatar
dengyihao 已提交
720
  iter->st = stmBuilderIntoStm(iter->fb);
dengyihao's avatar
dengyihao 已提交
721 722
  iter->rdr = reader;
  return iter;
dengyihao's avatar
dengyihao 已提交
723 724 725
}

Iterate* tfileIteratorCreate(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
726 727 728
  if (reader == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
729

wafwerar's avatar
wafwerar 已提交
730
  Iterate* iter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
731
  iter->iter = tfileFstIteratorCreate(reader);
732
  if (iter->iter == NULL) {
wafwerar's avatar
wafwerar 已提交
733
    taosMemoryFree(iter);
734 735
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
736 737
  iter->next = tfileIteratorNext;
  iter->getValue = tifileIterateGetValue;
dengyihao's avatar
dengyihao 已提交
738
  iter->val.val = taosArrayInit(1, sizeof(uint64_t));
739
  iter->val.colVal = NULL;
dengyihao's avatar
dengyihao 已提交
740 741 742
  return iter;
}
void tfileIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
743 744 745
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
746

dengyihao's avatar
dengyihao 已提交
747 748 749 750
  IterateValue* iv = &iter->val;
  iterateValueDestroy(iv, true);

  TFileFstIter* tIter = iter->iter;
dengyihao's avatar
dengyihao 已提交
751 752
  stmStDestroy(tIter->st);
  stmBuilderDestroy(tIter->fb);
dengyihao's avatar
dengyihao 已提交
753
  automCtxDestroy(tIter->ctx);
wafwerar's avatar
wafwerar 已提交
754
  taosMemoryFree(tIter);
dengyihao's avatar
dengyihao 已提交
755

wafwerar's avatar
wafwerar 已提交
756
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
757 758
}

dengyihao's avatar
dengyihao 已提交
759
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
dengyihao's avatar
dengyihao 已提交
760 761 762
  if (tf == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
763 764 765 766 767 768 769
  TFileReader* rd = NULL;
  ICacheKey    key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};

  taosThreadMutexLock(&tf->mtx);
  rd = tfileCacheGet(tf->cache, &key);
  taosThreadMutexUnlock(&tf->mtx);
  return rd;
dengyihao's avatar
dengyihao 已提交
770
}
dengyihao's avatar
dengyihao 已提交
771

dengyihao's avatar
dengyihao 已提交
772 773
static int tfileStrCompare(const void* a, const void* b) {
  int ret = strcmp((char*)a, (char*)b);
dengyihao's avatar
dengyihao 已提交
774 775 776
  if (ret == 0) {
    return ret;
  }
dengyihao's avatar
dengyihao 已提交
777 778 779
  return ret < 0 ? -1 : 1;
}

dengyihao's avatar
dengyihao 已提交
780 781 782 783 784 785 786 787
static int tfileValueCompare(const void* a, const void* b, const void* param) {
  __compar_fn_t fn = *(__compar_fn_t*)param;

  TFileValue* av = (TFileValue*)a;
  TFileValue* bv = (TFileValue*)b;

  return fn(av->colVal, bv->colVal);
}
dengyihao's avatar
dengyihao 已提交
788 789

TFileValue* tfileValueCreate(char* val) {
wafwerar's avatar
wafwerar 已提交
790
  TFileValue* tf = taosMemoryCalloc(1, sizeof(TFileValue));
dengyihao's avatar
dengyihao 已提交
791 792 793
  if (tf == NULL) {
    return NULL;
  }
794
  tf->colVal = tstrdup(val);
dengyihao's avatar
dengyihao 已提交
795 796 797 798
  tf->tableId = taosArrayInit(32, sizeof(uint64_t));
  return tf;
}
int tfileValuePush(TFileValue* tf, uint64_t val) {
dengyihao's avatar
dengyihao 已提交
799 800 801
  if (tf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
802 803 804 805 806
  taosArrayPush(tf->tableId, &val);
  return 0;
}
void tfileValueDestroy(TFileValue* tf) {
  taosArrayDestroy(tf->tableId);
wafwerar's avatar
wafwerar 已提交
807 808
  taosMemoryFree(tf->colVal);
  taosMemoryFree(tf);
dengyihao's avatar
dengyihao 已提交
809
}
dengyihao's avatar
dengyihao 已提交
810 811 812 813 814
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
  int sz = taosArrayGetSize(ids);
  SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
  for (size_t i = 0; i < sz; i++) {
    uint64_t* v = taosArrayGet(ids, i);
dengyihao's avatar
dengyihao 已提交
815 816 817 818 819 820 821
    SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
  }
}

static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
  int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
  tw->header.fstOffset = fstOffset;
dengyihao's avatar
dengyihao 已提交
822

dengyihao's avatar
dengyihao 已提交
823 824 825
  if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
826
  indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
dengyihao's avatar
dengyihao 已提交
827
  tw->offset += sizeof(fstOffset);
dengyihao's avatar
dengyihao 已提交
828 829 830
  return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
dengyihao's avatar
dengyihao 已提交
831
  char buf[TFILE_HEADER_NO_FST] = {0};
dengyihao's avatar
dengyihao 已提交
832 833 834 835

  TFileHeader* header = &writer->header;
  memcpy(buf, (char*)header, sizeof(buf));

dengyihao's avatar
dengyihao 已提交
836
  indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
837
  int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
838 839 840
  if (sizeof(buf) != nwrite) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
841 842

  indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
dengyihao's avatar
dengyihao 已提交
843 844 845 846 847 848
  writer->offset = nwrite;
  return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
  TFileHeader* header = &write->header;
  uint8_t      colType = header->colType;
dengyihao's avatar
add UT  
dengyihao 已提交
849

dengyihao's avatar
dengyihao 已提交
850
  colType = IDX_TYPE_GET_TYPE(colType);
851 852
  FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
  if (fstBuilderInsert(write->fb, key, tval->offset)) {
dengyihao's avatar
dengyihao 已提交
853
    fstSliceDestroy(&key);
854
    return 0;
dengyihao's avatar
dengyihao 已提交
855
  }
856
  return -1;
dengyihao's avatar
dengyihao 已提交
857
}
dengyihao's avatar
dengyihao 已提交
858
static int tfileWriteFooter(TFileWriter* write) {
dengyihao's avatar
dengyihao 已提交
859
  char  buf[sizeof(FILE_MAGIC_NUMBER) + 1] = {0};
dengyihao's avatar
dengyihao 已提交
860
  void* pBuf = (void*)buf;
dengyihao's avatar
dengyihao 已提交
861
  taosEncodeFixedU64((void**)(void*)&pBuf, FILE_MAGIC_NUMBER);
dengyihao's avatar
dengyihao 已提交
862
  int nwrite = write->ctx->write(write->ctx, buf, (int32_t)strlen(buf));
dengyihao's avatar
dengyihao 已提交
863 864

  indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
dengyihao's avatar
dengyihao 已提交
865
  assert(nwrite == sizeof(FILE_MAGIC_NUMBER));
dengyihao's avatar
dengyihao 已提交
866 867
  return nwrite;
}
dengyihao's avatar
dengyihao 已提交
868
static int tfileReaderLoadHeader(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
869
  // TODO simple tfile header later
dengyihao's avatar
dengyihao 已提交
870
  char buf[TFILE_HEADER_SIZE] = {0};
dengyihao's avatar
dengyihao 已提交
871

dengyihao's avatar
dengyihao 已提交
872
  int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
dengyihao's avatar
dengyihao 已提交
873

dengyihao's avatar
dengyihao 已提交
874
  if (nread == -1) {
S
Shengliang Guan 已提交
875
    indexError("actual Read: %d, to read: %d, code:0x%x, filename: %s", (int)(nread), (int)sizeof(buf), errno,
dengyihao's avatar
dengyihao 已提交
876
               reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
877
  } else {
dengyihao's avatar
dengyihao 已提交
878
    indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
879 880
  }
  // assert(nread == sizeof(buf));
dengyihao's avatar
dengyihao 已提交
881
  memcpy(&reader->header, buf, sizeof(buf));
dengyihao's avatar
dengyihao 已提交
882

dengyihao's avatar
dengyihao 已提交
883 884
  return 0;
}
dengyihao's avatar
dengyihao 已提交
885
static int tfileReaderLoadFst(TFileReader* reader) {
dengyihao's avatar
dengyihao 已提交
886 887
  IFileCtx* ctx = reader->ctx;
  int       size = ctx->size(ctx);
dengyihao's avatar
dengyihao 已提交
888

dengyihao's avatar
dengyihao 已提交
889
  // current load fst into memory, refactor it later
dengyihao's avatar
dengyihao 已提交
890
  int   fstSize = size - reader->header.fstOffset - sizeof(FILE_MAGIC_NUMBER);
wafwerar's avatar
wafwerar 已提交
891
  char* buf = taosMemoryCalloc(1, fstSize);
dengyihao's avatar
dengyihao 已提交
892 893 894
  if (buf == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
895

dengyihao's avatar
dengyihao 已提交
896
  int64_t ts = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
897
  int32_t nread = ctx->readFrom(ctx, buf, fstSize, reader->header.fstOffset);
dengyihao's avatar
dengyihao 已提交
898
  int64_t cost = taosGetTimestampUs() - ts;
dengyihao's avatar
dengyihao 已提交
899 900
  indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
            "us",
dengyihao's avatar
dengyihao 已提交
901
            nread, reader->header.fstOffset, fstSize, ctx->file.buf, size, cost);
dengyihao's avatar
dengyihao 已提交
902
  // we assuse fst size less than FST_MAX_SIZE
dengyihao's avatar
dengyihao 已提交
903
  assert(nread > 0 && nread <= fstSize);
dengyihao's avatar
dengyihao 已提交
904 905 906

  FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
  reader->fst = fstCreate(&st);
wafwerar's avatar
wafwerar 已提交
907
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
908 909
  fstSliceDestroy(&st);

dengyihao's avatar
dengyihao 已提交
910
  return reader->fst != NULL ? 0 : -1;
dengyihao's avatar
dengyihao 已提交
911
}
dengyihao's avatar
dengyihao 已提交
912
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
dengyihao's avatar
dengyihao 已提交
913
  // TODO(yihao): opt later
dengyihao's avatar
dengyihao 已提交
914
  IFileCtx* ctx = reader->ctx;
915
  // add block cache
dengyihao's avatar
dengyihao 已提交
916
  char    block[4096] = {0};
917
  int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
dengyihao's avatar
dengyihao 已提交
918 919 920 921 922 923 924 925 926 927 928 929 930 931
  assert(nread >= sizeof(uint32_t));

  char*   p = block;
  int32_t nid = *(int32_t*)p;
  p += sizeof(nid);

  while (nid > 0) {
    int32_t left = block + sizeof(block) - p;
    if (left >= sizeof(uint64_t)) {
      taosArrayPush(result, (uint64_t*)p);
      p += sizeof(uint64_t);
    } else {
      char buf[sizeof(uint64_t)] = {0};
      memcpy(buf, p, left);
dengyihao's avatar
dengyihao 已提交
932

dengyihao's avatar
dengyihao 已提交
933 934 935 936
      memset(block, 0, sizeof(block));
      offset += sizeof(block);
      nread = ctx->readFrom(ctx, block, sizeof(block), offset);
      memcpy(buf + left, block, sizeof(uint64_t) - left);
dengyihao's avatar
dengyihao 已提交
937

dengyihao's avatar
dengyihao 已提交
938 939 940 941
      taosArrayPush(result, (uint64_t*)buf);
      p = block + sizeof(uint64_t) - left;
    }
    nid -= 1;
dengyihao's avatar
dengyihao 已提交
942
  }
dengyihao's avatar
dengyihao 已提交
943 944
  return 0;
}
dengyihao's avatar
dengyihao 已提交
945 946
static int tfileReaderVerify(TFileReader* reader) {
  // just validate header and Footer, file corrupted also shuild be verified later
dengyihao's avatar
dengyihao 已提交
947
  IFileCtx* ctx = reader->ctx;
dengyihao's avatar
dengyihao 已提交
948 949

  uint64_t tMagicNumber = 0;
dengyihao's avatar
dengyihao 已提交
950 951
  char     buf[sizeof(tMagicNumber) + 1] = {0};
  int      size = ctx->size(ctx);
dengyihao's avatar
dengyihao 已提交
952 953 954 955 956 957 958 959

  if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
    return -1;
  } else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
    return -1;
  }

  taosDecodeFixedU64(buf, &tMagicNumber);
dengyihao's avatar
dengyihao 已提交
960
  return tMagicNumber == FILE_MAGIC_NUMBER ? 0 : -1;
dengyihao's avatar
dengyihao 已提交
961 962
}

dengyihao's avatar
dengyihao 已提交
963 964
void tfileReaderRef(TFileReader* rd) {
  if (rd == NULL) {
dengyihao's avatar
dengyihao 已提交
965 966
    return;
  }
dengyihao's avatar
dengyihao 已提交
967
  int ref = T_REF_INC(rd);
dengyihao's avatar
dengyihao 已提交
968 969 970
  UNUSED(ref);
}

dengyihao's avatar
dengyihao 已提交
971 972
void tfileReaderUnRef(TFileReader* rd) {
  if (rd == NULL) {
dengyihao's avatar
dengyihao 已提交
973 974
    return;
  }
dengyihao's avatar
dengyihao 已提交
975
  int ref = T_REF_DEC(rd);
976
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
977
    // do nothing
dengyihao's avatar
dengyihao 已提交
978
    tfileReaderDestroy(rd);
979
  }
dengyihao's avatar
dengyihao 已提交
980
}
dengyihao's avatar
dengyihao 已提交
981

dengyihao's avatar
dengyihao 已提交
982
static SArray* tfileGetFileList(const char* path) {
dengyihao's avatar
dengyihao 已提交
983 984
  char     buf[128] = {0};
  uint64_t suid;
dengyihao's avatar
dengyihao 已提交
985
  int64_t  version;
dengyihao's avatar
dengyihao 已提交
986
  SArray*  files = taosArrayInit(4, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
987

wafwerar's avatar
wafwerar 已提交
988 989
  TdDirPtr pDir = taosOpenDir(path);
  if (NULL == pDir) {
dengyihao's avatar
dengyihao 已提交
990 991
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
992 993 994
  TdDirEntryPtr pDirEntry;
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char* file = taosGetDirEntryName(pDirEntry);
dengyihao's avatar
dengyihao 已提交
995 996 997
    if (0 != tfileParseFileName(file, &suid, buf, &version)) {
      continue;
    }
dengyihao's avatar
dengyihao 已提交
998 999

    size_t len = strlen(path) + 1 + strlen(file) + 1;
wafwerar's avatar
wafwerar 已提交
1000
    char*  buf = taosMemoryCalloc(1, len);
dengyihao's avatar
dengyihao 已提交
1001
    sprintf(buf, "%s/%s", path, file);
dengyihao's avatar
dengyihao 已提交
1002
    taosArrayPush(files, &buf);
dengyihao's avatar
dengyihao 已提交
1003
  }
wafwerar's avatar
wafwerar 已提交
1004
  taosCloseDir(&pDir);
dengyihao's avatar
dengyihao 已提交
1005 1006 1007 1008 1009

  taosArraySort(files, tfileCompare);
  tfileRmExpireFile(files);

  return files;
dengyihao's avatar
dengyihao 已提交
1010
}
dengyihao's avatar
dengyihao 已提交
1011 1012 1013 1014
static int tfileRmExpireFile(SArray* result) {
  // TODO(yihao): remove expire tindex after restart
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1015 1016
static void tfileDestroyFileName(void* elem) {
  char* p = *(char**)elem;
wafwerar's avatar
wafwerar 已提交
1017
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
1018 1019
}
static int tfileCompare(const void* a, const void* b) {
dengyihao's avatar
dengyihao 已提交
1020 1021 1022
  const char* as = *(char**)a;
  const char* bs = *(char**)b;
  return strcmp(as, bs);
dengyihao's avatar
dengyihao 已提交
1023
}
dengyihao's avatar
dengyihao 已提交
1024

dengyihao's avatar
dengyihao 已提交
1025 1026
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) {
dengyihao's avatar
dengyihao 已提交
1027 1028 1029 1030 1031
    // read suid & colid & version  success
    return 0;
  }
  return -1;
}
dengyihao's avatar
dengyihao 已提交
1032
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
1033 1034
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) {
  sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
dengyihao's avatar
dengyihao 已提交
1035 1036
  return;
}
dengyihao's avatar
dengyihao 已提交
1037
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
dengyihao's avatar
dengyihao 已提交
1038 1039 1040 1041
  char filename[128] = {0};
  tfileGenFileName(filename, suid, col, version);
  sprintf(fullname, "%s/%s", path, filename);
}