indexFstFile.c 7.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
42 43
    int nwr = taosWriteFile(ctx->file.pFile, buf, len);
    assert(nwr == len);
dengyihao's avatar
dengyihao 已提交
44
  } else {
45 46
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
47 48 49
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
50
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
51
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
52
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
53 54 55 56
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
57
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
58
#endif
dengyihao's avatar
dengyihao 已提交
59
  } else {
dengyihao's avatar
dengyihao 已提交
60
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
61
  }
dengyihao's avatar
dengyihao 已提交
62
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64
  return nRead;
65
}
dengyihao's avatar
dengyihao 已提交
66
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
67 68 69 70 71 72 73 74 75 76 77 78
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

  do {
    char key[128] = {0};
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
79
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
80 81 82
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
83
      int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
      SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
dengyihao's avatar
dengyihao 已提交
86 87 88
      blk->blockId = blkId;
      blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
      assert(blk->nread <= kBlockSize);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90 91 92
      if (blk->nread < kBlockSize && blk->nread < len) {
        break;
      }
dengyihao's avatar
dengyihao 已提交
93 94

      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
      memcpy(buf + total, blk->buf + blkOffset, nread);

      LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                       TAOS_LRU_PRIORITY_LOW);
      if (s != TAOS_LRU_STATUS_OK) {
        return -1;
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
113
}
dengyihao's avatar
dengyihao 已提交
114
static int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
115
  if (ctx->type == TFILE) {
116 117 118
    int64_t file_size = 0;
    taosStatFile(ctx->file.buf, &file_size, NULL);
    return (int)file_size;
dengyihao's avatar
dengyihao 已提交
119 120 121
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
122
static int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
123
  if (ctx->type == TFILE) {
124
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
125 126 127 128 129 130
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
131 132
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
133 134 135
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
136

dengyihao's avatar
dengyihao 已提交
137
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
138
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
139
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
140
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
141
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
142
    if (readOnly == false) {
143
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
144
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
145
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
146
    } else {
147
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149 150 151
      int64_t size = 0;
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
      ctx->file.size = (int)size;
dengyihao's avatar
dengyihao 已提交
152
#ifdef USE_MMAP
153
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
154
#endif
155
    }
156
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
157
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
158
      goto END;
dengyihao's avatar
dengyihao 已提交
159
    }
dengyihao's avatar
dengyihao 已提交
160
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
161
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
162
    ctx->mem.cap = capacity;
163
  }
dengyihao's avatar
dengyihao 已提交
164

dengyihao's avatar
dengyihao 已提交
165 166 167 168
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
169
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
170 171

  ctx->offset = 0;
172
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
173 174

  return ctx;
dengyihao's avatar
dengyihao 已提交
175
END:
dengyihao's avatar
dengyihao 已提交
176
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
177 178
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
179
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
180
  return NULL;
dengyihao's avatar
dengyihao 已提交
181
}
dengyihao's avatar
dengyihao 已提交
182
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
183
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
184
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
185
  } else {
dengyihao's avatar
dengyihao 已提交
186
    ctx->flush(ctx);
187
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
188 189 190 191 192
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
193
    if (ctx->file.readOnly == false) {
194 195
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
dengyihao's avatar
dengyihao 已提交
196
    }
dengyihao's avatar
dengyihao 已提交
197 198 199
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
200
  }
wafwerar's avatar
wafwerar 已提交
201
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
202 203
}

dengyihao's avatar
dengyihao 已提交
204 205
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
206 207 208
  if (cw == NULL) {
    return NULL;
  }
209

dengyihao's avatar
dengyihao 已提交
210
  cw->wrt = wrt;
211
  return cw;
dengyihao's avatar
dengyihao 已提交
212
}
dengyihao's avatar
dengyihao 已提交
213 214
void idxFileDestroy(IdxFstFile* cw) {
  idxFileFlush(cw);
wafwerar's avatar
wafwerar 已提交
215
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
216 217
}

dengyihao's avatar
dengyihao 已提交
218
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
219 220 221
  if (write == NULL) {
    return 0;
  }
222
  // update checksum
dengyihao's avatar
dengyihao 已提交
223
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
224
  int       nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
225 226
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
227 228

  write->summer = taosCalcChecksum(write->summer, buf, len);
229 230
  return len;
}
dengyihao's avatar
dengyihao 已提交
231

dengyihao's avatar
dengyihao 已提交
232
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
233 234 235
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
236
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
237
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
238
}
239

dengyihao's avatar
dengyihao 已提交
240
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
241
  //////
dengyihao's avatar
dengyihao 已提交
242 243
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
244

dengyihao's avatar
dengyihao 已提交
245 246
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
247
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
248 249 250
  return 1;
}

dengyihao's avatar
dengyihao 已提交
251
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
252
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
253
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
254
  for (uint8_t i = 0; i < nBytes; i++) {
255
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
256 257
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
258
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
259
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
260 261
  return;
}
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
264
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
265
  idxFilePackUintIn(writer, n, nBytes);
266 267
  return nBytes;
}