You need to sign in or sign up before continuing.
indexFstFile.c 5.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
S
Shengliang Guan 已提交
15

dengyihao's avatar
dengyihao 已提交
16
#include "indexFstFile.h"
dengyihao's avatar
dengyihao 已提交
17
#include "indexFstUtil.h"
dengyihao's avatar
dengyihao 已提交
18
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "os.h"
20
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
21

dengyihao's avatar
dengyihao 已提交
22
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
23
  if (ctx->type == TFile) {
24
    assert(len == taosWriteFile(ctx->file.pFile, buf, len));
dengyihao's avatar
dengyihao 已提交
25
  } else {
26 27
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
28 29 30
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
31
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
32
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
33
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
34 35 36 37
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
38
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
39
#endif
dengyihao's avatar
dengyihao 已提交
40
  } else {
dengyihao's avatar
dengyihao 已提交
41
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
42
  }
dengyihao's avatar
dengyihao 已提交
43
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
44

dengyihao's avatar
dengyihao 已提交
45
  return nRead;
46
}
dengyihao's avatar
dengyihao 已提交
47
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
dengyihao's avatar
dengyihao 已提交
48 49
  int nRead = 0;
  if (ctx->type == TFile) {
50
    // tfLseek(ctx->file.pFile, offset, 0);
dengyihao's avatar
dengyihao 已提交
51 52 53 54 55
#ifdef USE_MMAP
    int32_t last = ctx->file.size - offset;
    nRead = last >= len ? len : last;
    memcpy(buf, ctx->file.ptr + offset, nRead);
#else
56
    nRead = taosPReadFile(ctx->file.pFile, buf, len, offset);
dengyihao's avatar
dengyihao 已提交
57
#endif
dengyihao's avatar
dengyihao 已提交
58 59 60 61 62 63
  } else {
    // refactor later
    assert(0);
  }
  return nRead;
}
dengyihao's avatar
dengyihao 已提交
64
static int idxFileCtxGetSize(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
65
  if (ctx->type == TFile) {
66 67 68
    int64_t file_size = 0;
    taosStatFile(ctx->file.buf, &file_size, NULL);
    return (int)file_size;
dengyihao's avatar
dengyihao 已提交
69 70 71
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
72
static int idxFileCtxDoFlush(IFileCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
73
  if (ctx->type == TFile) {
74 75 76
    // taosFsyncFile(ctx->file.pFile);
    taosFsyncFile(ctx->file.pFile);
    // tfFlush(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81 82
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
83 84
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
dengyihao's avatar
dengyihao 已提交
85 86 87
  if (ctx == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
88

dengyihao's avatar
dengyihao 已提交
89
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
90
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
91
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
92
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
93
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
94
    if (readOnly == false) {
95
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
96
      taosFtruncateFile(ctx->file.pFile, 0);
dengyihao's avatar
dengyihao 已提交
97 98
      taosStatFile(path, &ctx->file.size, NULL);
      // ctx->file.size = (int)size;
99

dengyihao's avatar
dengyihao 已提交
100
    } else {
101
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
102

dengyihao's avatar
dengyihao 已提交
103 104 105
      int64_t size = 0;
      taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
      ctx->file.size = (int)size;
dengyihao's avatar
dengyihao 已提交
106
#ifdef USE_MMAP
107
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
108
#endif
109
    }
110
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
111
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
112
      goto END;
dengyihao's avatar
dengyihao 已提交
113
    }
dengyihao's avatar
dengyihao 已提交
114
  } else if (ctx->type == TMemory) {
wafwerar's avatar
wafwerar 已提交
115
    ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
dengyihao's avatar
dengyihao 已提交
116
    ctx->mem.cap = capacity;
117
  }
dengyihao's avatar
dengyihao 已提交
118 119 120 121
  ctx->write = idxFileCtxDoWrite;
  ctx->read = idxFileCtxDoRead;
  ctx->flush = idxFileCtxDoFlush;
  ctx->readFrom = idxFileCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
122
  ctx->size = idxFileCtxGetSize;
dengyihao's avatar
dengyihao 已提交
123 124

  ctx->offset = 0;
125
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
126 127

  return ctx;
dengyihao's avatar
dengyihao 已提交
128
END:
dengyihao's avatar
dengyihao 已提交
129 130 131
  if (ctx->type == TMemory) {
    taosMemoryFree(ctx->mem.buf);
  }
wafwerar's avatar
wafwerar 已提交
132
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
133
  return NULL;
dengyihao's avatar
dengyihao 已提交
134
}
dengyihao's avatar
dengyihao 已提交
135
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
136
  if (ctx->type == TMemory) {
wafwerar's avatar
wafwerar 已提交
137
    taosMemoryFree(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
138
  } else {
dengyihao's avatar
dengyihao 已提交
139
    ctx->flush(ctx);
140
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
141 142 143 144 145
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
146
    if (ctx->file.readOnly == false) {
147 148
      int64_t file_size = 0;
      taosStatFile(ctx->file.buf, &file_size, NULL);
dengyihao's avatar
dengyihao 已提交
149
    }
dengyihao's avatar
dengyihao 已提交
150 151 152
    if (remove) {
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
153
  }
wafwerar's avatar
wafwerar 已提交
154
  taosMemoryFree(ctx);
dengyihao's avatar
dengyihao 已提交
155 156
}

dengyihao's avatar
dengyihao 已提交
157 158
IdxFstFile* idxFileCreate(void* wrt) {
  IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
dengyihao's avatar
dengyihao 已提交
159 160 161
  if (cw == NULL) {
    return NULL;
  }
162

dengyihao's avatar
dengyihao 已提交
163
  cw->wrt = wrt;
164
  return cw;
dengyihao's avatar
dengyihao 已提交
165
}
dengyihao's avatar
dengyihao 已提交
166
void idxFileDestroy(IdxFstFile* cw) {
167
  // free wrt object: close fd or free mem
dengyihao's avatar
dengyihao 已提交
168 169
  idxFileFlush(cw);
  // idxFileCtxDestroy((IFileCtx *)(cw->wrt));
wafwerar's avatar
wafwerar 已提交
170
  taosMemoryFree(cw);
dengyihao's avatar
dengyihao 已提交
171 172
}

dengyihao's avatar
dengyihao 已提交
173
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
174 175 176
  if (write == NULL) {
    return 0;
  }
177
  // update checksum
dengyihao's avatar
dengyihao 已提交
178
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
179
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
180

181
  int nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
182 183
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
184 185

  write->summer = taosCalcChecksum(write->summer, buf, len);
186 187
  return len;
}
dengyihao's avatar
dengyihao 已提交
188
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
189 190 191
  if (write == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
192 193
  IFileCtx* ctx = write->wrt;
  int       nRead = ctx->read(ctx, buf, len);
194 195
  // assert(nRead == len);
  return nRead;
dengyihao's avatar
dengyihao 已提交
196
}
197

dengyihao's avatar
dengyihao 已提交
198
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
dengyihao's avatar
dengyihao 已提交
199 200 201
  // opt
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
202

dengyihao's avatar
dengyihao 已提交
203 204
int idxFileFlush(IdxFstFile* write) {
  IFileCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
205
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
206 207 208
  return 1;
}

dengyihao's avatar
dengyihao 已提交
209
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
210
  assert(1 <= nBytes && nBytes <= 8);
wafwerar's avatar
wafwerar 已提交
211
  uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
212
  for (uint8_t i = 0; i < nBytes; i++) {
213
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
214 215
    n = n >> 8;
  }
dengyihao's avatar
dengyihao 已提交
216
  idxFileWrite(writer, buf, nBytes);
wafwerar's avatar
wafwerar 已提交
217
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
218 219
  return;
}
dengyihao's avatar
dengyihao 已提交
220

dengyihao's avatar
dengyihao 已提交
221
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
222
  uint8_t nBytes = packSize(n);
dengyihao's avatar
dengyihao 已提交
223
  idxFilePackUintIn(writer, n, nBytes);
224 225
  return nBytes;
}