indexFstFile.c 8.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

dengyihao's avatar
dengyihao 已提交
33
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
dengyihao's avatar
dengyihao 已提交
34 35 36 37 38 39
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  int tlen = len;
dengyihao's avatar
dengyihao 已提交
42
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
43
    int32_t cap = ctx->file.wBufCap;
dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    if (len + ctx->file.wBufOffset >= cap) {
      int32_t nw = cap - ctx->file.wBufOffset;
      memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, nw);
      taosWriteFile(ctx->file.pFile, ctx->file.wBuf, cap);

      memset(ctx->file.wBuf, 0, cap);
      ctx->file.wBufOffset = 0;

      len -= nw;
      buf += nw;

      nw = (len / cap) * cap;
      if (nw != 0) {
        taosWriteFile(ctx->file.pFile, buf, nw);
      }

      len -= nw;
      buf += nw;
      if (len != 0) {
        memcpy(ctx->file.wBuf, buf, len);
      }
      ctx->file.wBufOffset += len;
    } else {
      memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, len);
      ctx->file.wBufOffset += len;
    }

dengyihao's avatar
dengyihao 已提交
71
  } else {
72 73
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
74 75
  ctx->offset += tlen;
  return tlen;
dengyihao's avatar
dengyihao 已提交
76
}
dengyihao's avatar
dengyihao 已提交
77
static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
78
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
79
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
80 81 82 83
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
84
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
85
#endif
dengyihao's avatar
dengyihao 已提交
86
  } else {
dengyihao's avatar
dengyihao 已提交
87
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
88
  }
dengyihao's avatar
dengyihao 已提交
89
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
  return nRead;
92
}
dengyihao's avatar
dengyihao 已提交
93
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

dengyihao's avatar
dengyihao 已提交
99 100
  if (offset >= ctx->file.size) return 0;

dengyihao's avatar
dengyihao 已提交
101
  do {
dengyihao's avatar
dengyihao 已提交
102 103
    char key[1024] = {0};
    assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
dengyihao's avatar
dengyihao 已提交
104 105 106 107 108
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
109
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
110 111 112
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
      int32_t left = ctx->file.size - offset;
      if (left < kBlockSize) {
        nread = TMIN(left, len);
        int32_t bytes = taosPReadFile(ctx->file.pFile, buf + total, nread, offset);
        assert(bytes == nread);

        total += bytes;
        return total;
      } else {
        int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;

        SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
        blk->blockId = blkId;
        blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
        assert(blk->nread <= kBlockSize);

        if (blk->nread < kBlockSize && blk->nread < len) {
dengyihao's avatar
dengyihao 已提交
130
          taosMemoryFree(blk);
dengyihao's avatar
dengyihao 已提交
131 132 133 134 135 136 137 138 139 140 141
          break;
        }

        nread = TMIN(blkLeft, len);
        memcpy(buf + total, blk->buf + blkOffset, nread);

        LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                         TAOS_LRU_PRIORITY_LOW);
        if (s != TAOS_LRU_STATUS_OK) {
          return -1;
        }
dengyihao's avatar
dengyihao 已提交
142 143 144 145 146 147 148 149 150 151 152 153
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
154
}
dengyihao's avatar
dengyihao 已提交
155
static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
156
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
157 158 159 160 161 162 163
    if (ctx->file.readOnly == false) {
      return ctx->offset;
    } else {
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
      return (int)file_size;
    }
dengyihao's avatar
dengyihao 已提交
164 165 166
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
167
static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
168
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
169 170 171 172
    if (ctx->file.wBufOffset > 0) {
      int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
      ctx->file.wBufOffset = 0;
    }
173
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
174 175 176 177 178 179
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
180 181
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
182 183 184
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
187
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
188
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
189
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
190
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
191
    if (readOnly == false) {
192
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
193
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
194
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
195 196

      ctx->file.wBufOffset = 0;
dengyihao's avatar
dengyihao 已提交
197 198
      ctx->file.wBufCap = kBlockSize * 4;
      ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap);
dengyihao's avatar
dengyihao 已提交
199
    } else {
200
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
201
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
202 203
      ctx->file.wBufOffset = 0;

dengyihao's avatar
dengyihao 已提交
204
#ifdef USE_MMAP
205
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
206
#endif
207
    }
208
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
209
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
210
      goto END;
dengyihao's avatar
dengyihao 已提交
211
    }
dengyihao's avatar
dengyihao 已提交
212
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
213
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
214
    ctx->mem.cap = capacity;
215
  }
dengyihao's avatar
dengyihao 已提交
216

dengyihao's avatar
dengyihao 已提交
217 218 219 220
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
221
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
222 223

  ctx->offset = 0;
224
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
225 226

  return ctx;
dengyihao's avatar
dengyihao 已提交
227
END:
dengyihao's avatar
dengyihao 已提交
228
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
229 230
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
231
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
232
  return NULL;
dengyihao's avatar
dengyihao 已提交
233
}
dengyihao's avatar
dengyihao 已提交
234
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
235
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
236
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
237
  } else {
dengyihao's avatar
dengyihao 已提交
238 239 240 241
    if (ctx->file.wBufOffset > 0) {
      int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
      ctx->file.wBufOffset = 0;
    }
dengyihao's avatar
dengyihao 已提交
242
    ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
243
    taosMemoryFreeClear(ctx->file.wBuf);
244
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
245 246 247 248 249
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
250 251 252
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
253
  }
wafwerar's avatar
wafwerar 已提交
254
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
255 256
}

dengyihao's avatar
dengyihao 已提交
257 258
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
259 260 261
  if (cw == NULL) {
    return NULL;
  }
262

dengyihao's avatar
dengyihao 已提交
263
  cw->wrt = wrt;
264
  return cw;
dengyihao's avatar
dengyihao 已提交
265
}
dengyihao's avatar
dengyihao 已提交
266 267
void idxFileDestroy(IdxFstFile* cw) {
  idxFileFlush(cw);
wafwerar's avatar
wafwerar 已提交
268
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
269 270
}

dengyihao's avatar
dengyihao 已提交
271
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
272 273 274
  if (write == NULL) {
    return 0;
  }
275
  // update checksum
dengyihao's avatar
dengyihao 已提交
276
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
277
  int       nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
278 279
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
280 281

  write->summer = taosCalcChecksum(write->summer, buf, len);
282 283
  return len;
}
dengyihao's avatar
dengyihao 已提交
284

dengyihao's avatar
dengyihao 已提交
285
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
286 287 288
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
289
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
290
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
291
}
292

dengyihao's avatar
dengyihao 已提交
293
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
294
  //////
dengyihao's avatar
dengyihao 已提交
295 296
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298 299
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
300
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
301 302 303
  return 1;
}

dengyihao's avatar
dengyihao 已提交
304
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
305
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
306
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
307
  for (uint8_t i = 0; i < nBytes; i++) {
308
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
309 310
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
311
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
312
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
313 314
  return;
}
dengyihao's avatar
dengyihao 已提交
315

dengyihao's avatar
dengyihao 已提交
316
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
317
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
318
  idxFilePackUintIn(writer, n, nBytes);
319 320
  return nBytes;
}