indexFstFile.c 7.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
dengyihao's avatar
dengyihao 已提交
7
 * * This program is distributed in the hope that it will be useful, but WITHOUT
dengyihao's avatar
dengyihao 已提交
8 9 10 11 12 13
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
14

dengyihao's avatar
dengyihao 已提交
15
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
20
#include "os.h"
21
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
static int32_t kBlockSize = 4096;

typedef struct {
  int32_t blockId;
  int32_t nread;
  char    buf[0];
} SDataBlock;

static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }

static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
  char* p = buf;
  SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
  SERIALIZE_VAR_TO_BUF(p, '_', char);
  idxInt2str(blockId, p, 0);
  return;
}
dengyihao's avatar
dengyihao 已提交
40
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
41
  if (ctx->type == TFILE) {
42
    assert(len == taosWriteFile(ctx->file.pFile, buf, len));
dengyihao's avatar
dengyihao 已提交
43
  } else {
44 45
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
46 47 48
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
49
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
50
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
51
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
52 53 54 55
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
56
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
57
#endif
dengyihao's avatar
dengyihao 已提交
58
  } else {
dengyihao's avatar
dengyihao 已提交
59
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
60
  }
dengyihao's avatar
dengyihao 已提交
61
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
62

dengyihao's avatar
dengyihao 已提交
63
  return nRead;
64
}
dengyihao's avatar
dengyihao 已提交
65
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
66 67 68 69 70 71 72 73 74 75 76 77
  int32_t total = 0, nread = 0;
  int32_t blkId = offset / kBlockSize;
  int32_t blkOffset = offset % kBlockSize;
  int32_t blkLeft = kBlockSize - blkOffset;

  do {
    char key[128] = {0};
    idxGenLRUKey(key, ctx->file.buf, blkId);
    LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));

    if (h) {
      SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
dengyihao's avatar
dengyihao 已提交
78
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
79 80 81
      memcpy(buf + total, blk->buf + blkOffset, nread);
      taosLRUCacheRelease(ctx->lru, h, false);
    } else {
dengyihao's avatar
dengyihao 已提交
82
      int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84
      SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
dengyihao's avatar
dengyihao 已提交
85 86 87
      blk->blockId = blkId;
      blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
      assert(blk->nread <= kBlockSize);
dengyihao's avatar
dengyihao 已提交
88
      nread = TMIN(blkLeft, len);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
      if (blk->nread < kBlockSize && blk->nread < len) {
        break;
      }
      memcpy(buf + total, blk->buf + blkOffset, nread);

      LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
                                       TAOS_LRU_PRIORITY_LOW);
      if (s != TAOS_LRU_STATUS_OK) {
        return -1;
      }
    }
    total += nread;
    len -= nread;
    offset += nread;

    blkId = offset / kBlockSize;
    blkOffset = offset % kBlockSize;
    blkLeft = kBlockSize - blkOffset;

  } while (len > 0);
  return total;
dengyihao's avatar
dengyihao 已提交
111
}
dengyihao's avatar
dengyihao 已提交
112
static int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
113
  if (ctx->type == TFILE) {
114 115 116
    int64_t file_size = 0;
    taosStatFile(ctx->file.buf, &file_size, NULL);
    return (int)file_size;
dengyihao's avatar
dengyihao 已提交
117 118 119
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
120
static int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
121
  if (ctx->type == TFILE) {
122
    taosFsyncFile(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
123 124 125 126 127 128
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
129 130
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
131 132 133
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
136
  if (ctx->type == TFILE) {
dengyihao's avatar
dengyihao 已提交
137
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
138
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
139
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
140
    if (readOnly == false) {
141
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
142
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
143
      taosStatFile(path, &ctx->file.size, NULL);
dengyihao's avatar
dengyihao 已提交
144
    } else {
145
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147 148 149
      int64_t size = 0;
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
      ctx->file.size = (int)size;
dengyihao's avatar
dengyihao 已提交
150
#ifdef USE_MMAP
151
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
152
#endif
153
    }
154
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
155
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
156
      goto END;
dengyihao's avatar
dengyihao 已提交
157
    }
dengyihao's avatar
dengyihao 已提交
158
  } else if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
159
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
160
    ctx->mem.cap = capacity;
161
  }
dengyihao's avatar
dengyihao 已提交
162

dengyihao's avatar
dengyihao 已提交
163 164 165 166
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
167
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
168 169

  ctx->offset = 0;
170
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
171 172

  return ctx;
dengyihao's avatar
dengyihao 已提交
173
END:
dengyihao's avatar
dengyihao 已提交
174
  if (ctx->type == TMEMORY) {
dengyihao's avatar
dengyihao 已提交
175 176
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
177
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
178
  return NULL;
dengyihao's avatar
dengyihao 已提交
179
}
dengyihao's avatar
dengyihao 已提交
180
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
181
  if (ctx->type == TMEMORY) {
wafwerar's avatar
wafwerar 已提交
182
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
183
  } else {
dengyihao's avatar
dengyihao 已提交
184
    ctx->flush(ctx);
185
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
186 187 188 189 190
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
191
    if (ctx->file.readOnly == false) {
192 193
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
dengyihao's avatar
dengyihao 已提交
194
    }
dengyihao's avatar
dengyihao 已提交
195 196 197
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
198
  }
wafwerar's avatar
wafwerar 已提交
199
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
200 201
}

dengyihao's avatar
dengyihao 已提交
202 203
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
204 205 206
  if (cw == NULL) {
    return NULL;
  }
207

dengyihao's avatar
dengyihao 已提交
208
  cw->wrt = wrt;
209
  return cw;
dengyihao's avatar
dengyihao 已提交
210
}
dengyihao's avatar
dengyihao 已提交
211
void idxFileDestroy(IdxFstFile* cw) {
212
  // free wrt object: close fd or free mem
dengyihao's avatar
dengyihao 已提交
213 214
  idxFileFlush(cw);
  // idxFileCtxDestroy((IFileCtx *)(cw->wrt));
wafwerar's avatar
wafwerar 已提交
215
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
216 217
}

dengyihao's avatar
dengyihao 已提交
218
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
219 220 221
  if (write == NULL) {
    return 0;
  }
222
  // update checksum
dengyihao's avatar
dengyihao 已提交
223
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
224
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
225

226
  int nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
227 228
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
229 230

  write->summer = taosCalcChecksum(write->summer, buf, len);
231 232
  return len;
}
dengyihao's avatar
dengyihao 已提交
233

dengyihao's avatar
dengyihao 已提交
234
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
235 236 237
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
238
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
239
  return ctx->read(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
240
}
241

dengyihao's avatar
dengyihao 已提交
242
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
243
  //////
dengyihao's avatar
dengyihao 已提交
244 245
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
246

dengyihao's avatar
dengyihao 已提交
247 248
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
249
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
250 251 252
  return 1;
}

dengyihao's avatar
dengyihao 已提交
253
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
254
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
255
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
256
  for (uint8_t i = 0; i < nBytes; i++) {
257
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
258 259
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
260
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
261
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
262 263
  return;
}
dengyihao's avatar
dengyihao 已提交
264

dengyihao's avatar
dengyihao 已提交
265
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
266
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
267
  idxFilePackUintIn(writer, n, nBytes);
268 269
  return nBytes;
}