indexFstFile.c 7.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

dengyihao's avatar
dengyihao 已提交
33
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
dengyihao's avatar
dengyihao 已提交
34 35 36 37 38 39
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
42 43
    int nwr = taosWriteFile(ctx->file.pFile, buf, len);
    assert(nwr == len);
dengyihao's avatar
dengyihao 已提交
44
  } else {
45 46
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
47 48 49
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
50
static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
51
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
52
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
53 54 55 56
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
57
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
58
#endif
dengyihao's avatar
dengyihao 已提交
59
  } else {
dengyihao's avatar
dengyihao 已提交
60
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
61
  }
dengyihao's avatar
dengyihao 已提交
62
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64
  return nRead;
65
}
dengyihao's avatar
dengyihao 已提交
66
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
67 68 69 70 71
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

dengyihao's avatar
dengyihao 已提交
72 73
  if (offset >= ctx->file.size) return 0;

dengyihao's avatar
dengyihao 已提交
74 75 76 77 78 79 80
  do {
    char key[128] = {0};
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
81
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
82 83 84
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
      int32_t left = ctx->file.size - offset;
      if (left < kBlockSize) {
        nread = TMIN(left, len);
        int32_t bytes = taosPReadFile(ctx->file.pFile, buf + total, nread, offset);
        assert(bytes == nread);

        total += bytes;
        return total;
      } else {
        int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;

        SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
        blk->blockId = blkId;
        blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
        assert(blk->nread <= kBlockSize);

        if (blk->nread < kBlockSize && blk->nread < len) {
          break;
        }

        nread = TMIN(blkLeft, len);
        memcpy(buf + total, blk->buf + blkOffset, nread);

        LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                         TAOS_LRU_PRIORITY_LOW);
        if (s != TAOS_LRU_STATUS_OK) {
          return -1;
        }
dengyihao's avatar
dengyihao 已提交
113 114 115 116 117 118 119 120 121 122 123 124
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
125
}
dengyihao's avatar
dengyihao 已提交
126
static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
127
  if (ctx->type == TFILE) {
128 129 130
    int64_t file_size = 0;
    taosStatFile(ctx->file.buf, &file_size, NULL);
    return (int)file_size;
dengyihao's avatar
dengyihao 已提交
131 132 133
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
134
static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
135
  if (ctx->type == TFILE) {
136
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
137 138 139 140 141 142
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
143 144
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
145 146 147
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
150
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
151
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
152
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
153
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
154
    if (readOnly == false) {
155
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
156
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
157
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
158
    } else {
159
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
160

dengyihao's avatar
dengyihao 已提交
161
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
162
#ifdef USE_MMAP
163
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
164
#endif
165
    }
166
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
167
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
168
      goto END;
dengyihao's avatar
dengyihao 已提交
169
    }
dengyihao's avatar
dengyihao 已提交
170
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
171
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
172
    ctx->mem.cap = capacity;
173
  }
dengyihao's avatar
dengyihao 已提交
174

dengyihao's avatar
dengyihao 已提交
175 176 177 178
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
179
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
180 181

  ctx->offset = 0;
182
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
183 184

  return ctx;
dengyihao's avatar
dengyihao 已提交
185
END:
dengyihao's avatar
dengyihao 已提交
186
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
187 188
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
189
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
190
  return NULL;
dengyihao's avatar
dengyihao 已提交
191
}
dengyihao's avatar
dengyihao 已提交
192
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
193
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
194
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
195
  } else {
dengyihao's avatar
dengyihao 已提交
196
    ctx->flush(ctx);
197
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
198 199 200 201 202
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
203
    if (ctx->file.readOnly == false) {
204 205
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
dengyihao's avatar
dengyihao 已提交
206
    }
dengyihao's avatar
dengyihao 已提交
207 208 209
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
210
  }
wafwerar's avatar
wafwerar 已提交
211
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
212 213
}

dengyihao's avatar
dengyihao 已提交
214 215
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
216 217 218
  if (cw == NULL) {
    return NULL;
  }
219

dengyihao's avatar
dengyihao 已提交
220
  cw->wrt = wrt;
221
  return cw;
dengyihao's avatar
dengyihao 已提交
222
}
dengyihao's avatar
dengyihao 已提交
223 224
void idxFileDestroy(IdxFstFile* cw) {
  idxFileFlush(cw);
wafwerar's avatar
wafwerar 已提交
225
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
226 227
}

dengyihao's avatar
dengyihao 已提交
228
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
229 230 231
  if (write == NULL) {
    return 0;
  }
232
  // update checksum
dengyihao's avatar
dengyihao 已提交
233
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
234
  int       nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
235 236
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
237 238

  write->summer = taosCalcChecksum(write->summer, buf, len);
239 240
  return len;
}
dengyihao's avatar
dengyihao 已提交
241

dengyihao's avatar
dengyihao 已提交
242
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
243 244 245
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
246
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
247
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
248
}
249

dengyihao's avatar
dengyihao 已提交
250
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
251
  //////
dengyihao's avatar
dengyihao 已提交
252 253
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
257
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
258 259 260
  return 1;
}

dengyihao's avatar
dengyihao 已提交
261
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
262
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
263
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
264
  for (uint8_t i = 0; i < nBytes; i++) {
265
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
266 267
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
268
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
269
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
270 271
  return;
}
dengyihao's avatar
dengyihao 已提交
272

dengyihao's avatar
dengyihao 已提交
273
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
274
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
275
  idxFilePackUintIn(writer, n, nBytes);
276 277
  return nBytes;
}