indexFstFile.c 7.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

dengyihao's avatar
dengyihao 已提交
33
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
dengyihao's avatar
dengyihao 已提交
34 35 36 37 38 39
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
42 43
    int nwr = taosWriteFile(ctx->file.pFile, buf, len);
    assert(nwr == len);
dengyihao's avatar
dengyihao 已提交
44
  } else {
45 46
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
47 48 49
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
50
static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
51
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
52
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
53 54 55 56
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
57
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
58
#endif
dengyihao's avatar
dengyihao 已提交
59
  } else {
dengyihao's avatar
dengyihao 已提交
60
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
61
  }
dengyihao's avatar
dengyihao 已提交
62
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64
  return nRead;
65
}
dengyihao's avatar
dengyihao 已提交
66
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
67 68 69 70 71
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

dengyihao's avatar
dengyihao 已提交
72 73
  if (offset >= ctx->file.size) return 0;

dengyihao's avatar
dengyihao 已提交
74
  do {
dengyihao's avatar
dengyihao 已提交
75 76
    char key[1024] = {0};
    assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
82
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
83 84 85
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
      int32_t left = ctx->file.size - offset;
      if (left < kBlockSize) {
        nread = TMIN(left, len);
        int32_t bytes = taosPReadFile(ctx->file.pFile, buf + total, nread, offset);
        assert(bytes == nread);

        total += bytes;
        return total;
      } else {
        int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;

        SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
        blk->blockId = blkId;
        blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
        assert(blk->nread <= kBlockSize);

        if (blk->nread < kBlockSize && blk->nread < len) {
dengyihao's avatar
dengyihao 已提交
103
          taosMemoryFree(blk);
dengyihao's avatar
dengyihao 已提交
104 105 106 107 108 109 110 111 112 113 114
          break;
        }

        nread = TMIN(blkLeft, len);
        memcpy(buf + total, blk->buf + blkOffset, nread);

        LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                         TAOS_LRU_PRIORITY_LOW);
        if (s != TAOS_LRU_STATUS_OK) {
          return -1;
        }
dengyihao's avatar
dengyihao 已提交
115 116 117 118 119 120 121 122 123 124 125 126
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
127
}
dengyihao's avatar
dengyihao 已提交
128
static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
129
  if (ctx->type == TFILE) {
130 131 132
    int64_t file_size = 0;
    taosStatFile(ctx->file.buf, &file_size, NULL);
    return (int)file_size;
dengyihao's avatar
dengyihao 已提交
133 134 135
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
136
static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
137
  if (ctx->type == TFILE) {
138
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
145 146
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
147 148 149
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
150

dengyihao's avatar
dengyihao 已提交
151
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
152
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
153
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
154
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
155
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
156
    if (readOnly == false) {
157
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
158
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
159
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
160
    } else {
161
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
162

dengyihao's avatar
dengyihao 已提交
163
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
164
#ifdef USE_MMAP
165
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
166
#endif
167
    }
168
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
169
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
170
      goto END;
dengyihao's avatar
dengyihao 已提交
171
    }
dengyihao's avatar
dengyihao 已提交
172
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
173
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
174
    ctx->mem.cap = capacity;
175
  }
dengyihao's avatar
dengyihao 已提交
176

dengyihao's avatar
dengyihao 已提交
177 178 179 180
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
181
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
182 183

  ctx->offset = 0;
184
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
185 186

  return ctx;
dengyihao's avatar
dengyihao 已提交
187
END:
dengyihao's avatar
dengyihao 已提交
188
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
189 190
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
191
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
192
  return NULL;
dengyihao's avatar
dengyihao 已提交
193
}
dengyihao's avatar
dengyihao 已提交
194
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
195
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
196
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
197
  } else {
dengyihao's avatar
dengyihao 已提交
198
    ctx->flush(ctx);
199
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
200 201 202 203 204
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
205
    if (ctx->file.readOnly == false) {
206 207
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
dengyihao's avatar
dengyihao 已提交
208
    }
dengyihao's avatar
dengyihao 已提交
209 210 211
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
212
  }
wafwerar's avatar
wafwerar 已提交
213
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
214 215
}

dengyihao's avatar
dengyihao 已提交
216 217
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
218 219 220
  if (cw == NULL) {
    return NULL;
  }
221

dengyihao's avatar
dengyihao 已提交
222
  cw->wrt = wrt;
223
  return cw;
dengyihao's avatar
dengyihao 已提交
224
}
dengyihao's avatar
dengyihao 已提交
225 226
void idxFileDestroy(IdxFstFile* cw) {
  idxFileFlush(cw);
wafwerar's avatar
wafwerar 已提交
227
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
228 229
}

dengyihao's avatar
dengyihao 已提交
230
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
231 232 233
  if (write == NULL) {
    return 0;
  }
234
  // update checksum
dengyihao's avatar
dengyihao 已提交
235
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
236
  int       nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
237 238
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
239 240

  write->summer = taosCalcChecksum(write->summer, buf, len);
241 242
  return len;
}
dengyihao's avatar
dengyihao 已提交
243

dengyihao's avatar
dengyihao 已提交
244
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
245 246 247
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
248
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
249
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
250
}
251

dengyihao's avatar
dengyihao 已提交
252
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
253
  //////
dengyihao's avatar
dengyihao 已提交
254 255
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
256

dengyihao's avatar
dengyihao 已提交
257 258
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
259
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
260 261 262
  return 1;
}

dengyihao's avatar
dengyihao 已提交
263
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
264
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
265
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
266
  for (uint8_t i = 0; i < nBytes; i++) {
267
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
268 269
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
270
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
271
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
272 273
  return;
}
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
276
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
277
  idxFilePackUintIn(writer, n, nBytes);
278 279
  return nBytes;
}