index_tfile.c 6.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
//#include <sys/types.h>
//#include <dirent.h>
dengyihao's avatar
dengyihao 已提交
18
#include "index_tfile.h"
19
#include "index.h"
dengyihao's avatar
dengyihao 已提交
20
#include "index_fst.h"
21
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
22
#include "index_util.h"
dengyihao's avatar
dengyihao 已提交
23
#include "taosdef.h"
dengyihao's avatar
dengyihao 已提交
24

25 26 27 28 29 30
static FORCE_INLINE int tfileReadLoadHeader(TFileReader *reader) {
  // TODO simple tfile header later
  char             buf[TFILE_HADER_PRE_SIZE];
  char *           p = buf;
  TFileReadHeader *header = &reader->header;
  int64_t          nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE);
dengyihao's avatar
dengyihao 已提交
31
  assert(nread == TFILE_HADER_PRE_SIZE);
32

dengyihao's avatar
dengyihao 已提交
33 34 35 36 37 38
  memcpy(&header->suid, p, sizeof(header->suid));
  p += sizeof(header->suid);

  memcpy(&header->version, p, sizeof(header->version));
  p += sizeof(header->version);

39
  int32_t colLen = 0;
dengyihao's avatar
dengyihao 已提交
40
  memcpy(&colLen, p, sizeof(colLen));
41 42
  assert(colLen < sizeof(header->colName));
  nread = reader->ctx->read(reader->ctx, header->colName, colLen);
dengyihao's avatar
dengyihao 已提交
43 44
  assert(nread == colLen);

45 46
  nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType));
  return 0;
dengyihao's avatar
dengyihao 已提交
47
};
dengyihao's avatar
dengyihao 已提交
48
static int tfileGetFileList(const char *path, SArray *result) {
49 50 51 52
  DIR *dir = opendir(path);
  if (NULL == dir) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
53

dengyihao's avatar
dengyihao 已提交
54 55 56
  struct dirent *entry;
  while ((entry = readdir(dir)) != NULL) {
    size_t len = strlen(entry->d_name);
57 58 59
    char * buf = calloc(1, len + 1);
    memcpy(buf, entry->d_name, len);
    taosArrayPush(result, &buf);
dengyihao's avatar
dengyihao 已提交
60 61 62
  }
  closedir(dir);
  return 0;
63
}
dengyihao's avatar
dengyihao 已提交
64 65 66
static void tfileDestroyFileName(void *elem) {
  char *p = *(char **)elem;
  free(p);
67
}
dengyihao's avatar
dengyihao 已提交
68 69 70
static int tfileCompare(const void *a, const void *b) {
  const char *aName = *(char **)a;
  const char *bName = *(char **)b;
71 72
  size_t      aLen = strlen(aName);
  size_t      bLen = strlen(bName);
dengyihao's avatar
dengyihao 已提交
73 74
  return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
dengyihao's avatar
dengyihao 已提交
75
// tfile name suid-colId-version.tindex
dengyihao's avatar
dengyihao 已提交
76 77 78 79
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
  if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
    // read suid & colid & version  success
    return 0;
80 81 82
  }
  return -1;
}
dengyihao's avatar
dengyihao 已提交
83
static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
dengyihao's avatar
dengyihao 已提交
84
  SERIALIZE_MEM_TO_BUF(buf, key, suid);
85
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
86
  SERIALIZE_MEM_TO_BUF(buf, key, colType);
87
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
88
  SERIALIZE_MEM_TO_BUF(buf, key, version);
89
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
90
  SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
dengyihao's avatar
dengyihao 已提交
91 92
}

dengyihao's avatar
dengyihao 已提交
93
TFileCache *tfileCacheCreate(const char *path) {
94 95 96 97 98 99 100 101 102 103
  TFileCache *tcache = calloc(1, sizeof(TFileCache));
  if (tcache == NULL) {
    return NULL;
  }

  tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  tcache->capacity = 64;

  SArray *files = taosArrayInit(4, sizeof(void *));
  tfileGetFileList(path, files);
dengyihao's avatar
dengyihao 已提交
104 105
  taosArraySort(files, tfileCompare);
  for (size_t i = 0; i < taosArrayGetSize(files); i++) {
106 107 108
    char *   file = taosArrayGetP(files, i);
    uint64_t suid;
    int      colId, version;
dengyihao's avatar
dengyihao 已提交
109
    if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
dengyihao's avatar
dengyihao 已提交
110
      goto End;
111 112
      continue;
    }
dengyihao's avatar
dengyihao 已提交
113

114 115
    WriterCtx *wc = writerCtxCreate(TFile, file, true, 1024 * 64);
    if (wc == NULL) {
dengyihao's avatar
dengyihao 已提交
116
      indexError("failed to open index:  %s", file);
117
      goto End;
dengyihao's avatar
dengyihao 已提交
118
    }
119 120 121 122 123
    TFileReader *reader = tfileReaderCreate(wc);
    if (0 != tfileReadLoadHeader(reader)) {
      TFileReaderDestroy(reader);
      indexError("failed to load index header, index Id: %s", file);
      goto End;
dengyihao's avatar
dengyihao 已提交
124
    }
dengyihao's avatar
dengyihao 已提交
125
  }
dengyihao's avatar
dengyihao 已提交
126
  taosArrayDestroyEx(files, tfileDestroyFileName);
dengyihao's avatar
dengyihao 已提交
127
  return tcache;
dengyihao's avatar
dengyihao 已提交
128
End:
129
  tfileCacheDestroy(tcache);
dengyihao's avatar
dengyihao 已提交
130
  taosArrayDestroyEx(files, tfileDestroyFileName);
131
  return NULL;
dengyihao's avatar
dengyihao 已提交
132 133
}
void tfileCacheDestroy(TFileCache *tcache) {
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
  if (tcache == NULL) {
    return;
  }

  // free table cache
  TFileReader **reader = taosHashIterate(tcache->tableCache, NULL);
  while (reader) {
    TFileReader *p = *reader;
    indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
        p->header.colType);
    TFileReaderDestroy(p);
    reader = taosHashIterate(tcache->tableCache, reader);
  }
  taosHashCleanup(tcache->tableCache);
  free(tcache);
dengyihao's avatar
dengyihao 已提交
149 150 151
}

TFileReader *tfileCacheGet(TFileCache *tcache, TFileCacheKey *key) {
152
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
153
  tfileSerialCacheKey(key, buf);
154 155
  TFileReader *reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
  return reader;
dengyihao's avatar
dengyihao 已提交
156 157 158 159
}
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) {
  char buf[128] = {0};
  tfileSerialCacheKey(key, buf);
160
  taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void *));
dengyihao's avatar
dengyihao 已提交
161
  return;
162
}
dengyihao's avatar
dengyihao 已提交
163

164 165 166 167 168 169 170 171
TFileReader *tfileReaderCreate(WriterCtx *ctx) {
  TFileReader *reader = calloc(1, sizeof(TFileReader));
  if (reader == NULL) {
    return NULL;
  }
  reader->ctx = ctx;
  // T_REF_INC(reader);
  return reader;
dengyihao's avatar
dengyihao 已提交
172 173
}
void TFileReaderDestroy(TFileReader *reader) {
174 175 176 177 178
  if (reader == NULL) {
    return;
  }
  // T_REF_INC(reader);
  writerCtxDestroy(reader->ctx);
dengyihao's avatar
dengyihao 已提交
179 180 181 182
  free(reader);
}

TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
183
void         tfileWriterDestroy(TFileWriter *tw);
dengyihao's avatar
dengyihao 已提交
184

dengyihao's avatar
dengyihao 已提交
185
IndexTFile *indexTFileCreate(const char *path) {
186 187 188
  IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
  tfile->cache = tfileCacheCreate(path);

dengyihao's avatar
dengyihao 已提交
189 190
  return tfile;
}
191
void IndexTFileDestroy(IndexTFile *tfile) { free(tfile); }
dengyihao's avatar
dengyihao 已提交
192

dengyihao's avatar
dengyihao 已提交
193
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
dengyihao's avatar
dengyihao 已提交
194
  IndexTFile *pTfile = (IndexTFile *)tfile;
195 196 197 198 199

  SIndexTerm *  term = query->term;
  TFileCacheKey key = {
      .suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName};
  TFileReader *reader = tfileCacheGet(pTfile->cache, &key);
dengyihao's avatar
dengyihao 已提交
200 201
  return 0;
}
202 203 204
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) {
  TFileWriterOpt wOpt = {
      .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1};
dengyihao's avatar
dengyihao 已提交
205

206 207
  return 0;
}