indexFstFile.c 8.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

dengyihao's avatar
dengyihao 已提交
33
static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
dengyihao's avatar
dengyihao 已提交
34 35 36 37 38 39
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  int tlen = len;
dengyihao's avatar
dengyihao 已提交
42
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
43
    int32_t cap = ctx->file.wBufCap;
dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    if (len + ctx->file.wBufOffset >= cap) {
      int32_t nw = cap - ctx->file.wBufOffset;
      memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, nw);
      taosWriteFile(ctx->file.pFile, ctx->file.wBuf, cap);

      memset(ctx->file.wBuf, 0, cap);
      ctx->file.wBufOffset = 0;

      len -= nw;
      buf += nw;

      nw = (len / cap) * cap;
      if (nw != 0) {
        taosWriteFile(ctx->file.pFile, buf, nw);
      }

      len -= nw;
      buf += nw;
      if (len != 0) {
        memcpy(ctx->file.wBuf, buf, len);
      }
      ctx->file.wBufOffset += len;
    } else {
      memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, len);
      ctx->file.wBufOffset += len;
    }

dengyihao's avatar
dengyihao 已提交
71
  } else {
72 73
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
74 75
  ctx->offset += tlen;
  return tlen;
dengyihao's avatar
dengyihao 已提交
76
}
dengyihao's avatar
dengyihao 已提交
77
static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
78
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
79
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
80 81 82 83
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
84
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
85
#endif
dengyihao's avatar
dengyihao 已提交
86
  } else {
dengyihao's avatar
dengyihao 已提交
87
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
88
  }
dengyihao's avatar
dengyihao 已提交
89
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
  return nRead;
92
}
dengyihao's avatar
dengyihao 已提交
93
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

dengyihao's avatar
dengyihao 已提交
99 100
  if (offset >= ctx->file.size) return 0;

dengyihao's avatar
dengyihao 已提交
101
  do {
dengyihao's avatar
dengyihao 已提交
102
    char key[1024] = {0};
dengyihao's avatar
dengyihao 已提交
103
    ASSERT(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
dengyihao's avatar
dengyihao 已提交
104 105 106 107 108
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
109
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
110 111 112
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
113 114 115 116
      int32_t left = ctx->file.size - offset;
      if (left < kBlockSize) {
        nread = TMIN(left, len);
        int32_t bytes = taosPReadFile(ctx->file.pFile, buf + total, nread, offset);
dengyihao's avatar
dengyihao 已提交
117 118
        ASSERTS(bytes == nread, "index read incomplete data");
        if (bytes != nread) break;
dengyihao's avatar
dengyihao 已提交
119 120 121 122 123 124 125 126 127

        total += bytes;
        return total;
      } else {
        int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;

        SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
        blk->blockId = blkId;
        blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
dengyihao's avatar
dengyihao 已提交
128 129
        ASSERTS(blk->nread <= kBlockSize, "index read incomplete data");
        if (blk->nread > kBlockSize) break;
dengyihao's avatar
dengyihao 已提交
130 131

        if (blk->nread < kBlockSize && blk->nread < len) {
dengyihao's avatar
dengyihao 已提交
132
          taosMemoryFree(blk);
dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139 140 141 142 143
          break;
        }

        nread = TMIN(blkLeft, len);
        memcpy(buf + total, blk->buf + blkOffset, nread);

        LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                         TAOS_LRU_PRIORITY_LOW);
        if (s != TAOS_LRU_STATUS_OK) {
          return -1;
        }
dengyihao's avatar
dengyihao 已提交
144 145 146 147 148 149 150 151 152 153 154 155
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
156
}
dengyihao's avatar
dengyihao 已提交
157
static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
158
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
159 160 161 162 163 164 165
    if (ctx->file.readOnly == false) {
      return ctx->offset;
    } else {
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
      return (int)file_size;
    }
dengyihao's avatar
dengyihao 已提交
166 167 168
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
169
static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
170
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
171 172 173 174
    if (ctx->file.wBufOffset > 0) {
      int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
      ctx->file.wBufOffset = 0;
    }
175
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
176 177 178 179 180 181
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
182 183
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
184 185 186
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
187

dengyihao's avatar
dengyihao 已提交
188
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
189
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
190
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
191
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
192
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
193
    if (readOnly == false) {
194
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
195
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
196
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
197 198

      ctx->file.wBufOffset = 0;
dengyihao's avatar
dengyihao 已提交
199 200
      ctx->file.wBufCap = kBlockSize * 4;
      ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap);
dengyihao's avatar
dengyihao 已提交
201
    } else {
202
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
203
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
204 205
      ctx->file.wBufOffset = 0;

dengyihao's avatar
dengyihao 已提交
206
#ifdef USE_MMAP
207
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
208
#endif
209
    }
210
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
211
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
212
      goto END;
dengyihao's avatar
dengyihao 已提交
213
    }
dengyihao's avatar
dengyihao 已提交
214
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
215
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
216
    ctx->mem.cap = capacity;
217
  }
dengyihao's avatar
dengyihao 已提交
218

dengyihao's avatar
dengyihao 已提交
219 220 221 222
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
223
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
224 225

  ctx->offset = 0;
226
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
227 228

  return ctx;
dengyihao's avatar
dengyihao 已提交
229
END:
dengyihao's avatar
dengyihao 已提交
230
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
231 232
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
233
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
234
  return NULL;
dengyihao's avatar
dengyihao 已提交
235
}
dengyihao's avatar
dengyihao 已提交
236
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
237
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
238
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
239
  } else {
dengyihao's avatar
dengyihao 已提交
240 241 242 243
    if (ctx->file.wBufOffset > 0) {
      int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset);
      ctx->file.wBufOffset = 0;
    }
dengyihao's avatar
dengyihao 已提交
244
    ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
245
    taosMemoryFreeClear(ctx->file.wBuf);
246
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
247 248 249 250 251
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
252 253 254
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
255
  }
wafwerar's avatar
wafwerar 已提交
256
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
257 258
}

dengyihao's avatar
dengyihao 已提交
259 260
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
261 262 263
  if (cw == NULL) {
    return NULL;
  }
264

dengyihao's avatar
dengyihao 已提交
265
  cw->wrt = wrt;
266
  return cw;
dengyihao's avatar
dengyihao 已提交
267
}
dengyihao's avatar
dengyihao 已提交
268 269
void idxFileDestroy(IdxFstFile* cw) {
  idxFileFlush(cw);
wafwerar's avatar
wafwerar 已提交
270
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
271 272
}

dengyihao's avatar
dengyihao 已提交
273
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
274 275 276
  if (write == NULL) {
    return 0;
  }
277
  // update checksum
dengyihao's avatar
dengyihao 已提交
278
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
279
  int       nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
280 281 282 283
  ASSERTS(nWrite == len, "index write incomplete data");
  if (nWrite != len) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
284
  write->count += len;
dengyihao's avatar
dengyihao 已提交
285 286

  write->summer = taosCalcChecksum(write->summer, buf, len);
287 288
  return len;
}
dengyihao's avatar
dengyihao 已提交
289

dengyihao's avatar
dengyihao 已提交
290
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
291 292 293
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
294
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
295
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
296
}
297

dengyihao's avatar
dengyihao 已提交
298
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
299
  //////
dengyihao's avatar
dengyihao 已提交
300 301
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
302

dengyihao's avatar
dengyihao 已提交
303 304
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
305
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
306 307 308
  return 1;
}

dengyihao's avatar
dengyihao 已提交
309
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
wafwerar's avatar
wafwerar 已提交
310
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
311
  for (uint8_t i = 0; i < nBytes; i++) {
312
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
313 314
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
315
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
316
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
317 318
  return;
}
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
321
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
322
  idxFilePackUintIn(writer, n, nBytes);
323 324
  return nBytes;
}