index_fst_counting_writer.c 6.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_fst_util.h"
18
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
21
  if (ctx->type == TFile) {
22
    assert(len == taosWriteFile(ctx->file.pFile, buf, len));
dengyihao's avatar
dengyihao 已提交
23
  } else {
24 25
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
26 27 28
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
29
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
30
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
31
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
32 33 34 35
#ifdef USE_MMAP
    nRead = len < ctx->file.size ? len : ctx->file.size;
    memcpy(buf, ctx->file.ptr, nRead);
#else
36
    nRead = taosReadFile(ctx->file.pFile, buf, len);
dengyihao's avatar
dengyihao 已提交
37
#endif
dengyihao's avatar
dengyihao 已提交
38
  } else {
dengyihao's avatar
dengyihao 已提交
39
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
40
  }
dengyihao's avatar
dengyihao 已提交
41
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
  return nRead;
44
}
dengyihao's avatar
dengyihao 已提交
45 46 47
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
  int nRead = 0;
  if (ctx->type == TFile) {
48
    // tfLseek(ctx->file.pFile, offset, 0);
dengyihao's avatar
dengyihao 已提交
49 50 51 52 53
#ifdef USE_MMAP
    int32_t last = ctx->file.size - offset;
    nRead = last >= len ? len : last;
    memcpy(buf, ctx->file.ptr + offset, nRead);
#else
54
    nRead = taosPReadFile(ctx->file.pFile, buf, len, offset);
dengyihao's avatar
dengyihao 已提交
55
#endif
dengyihao's avatar
dengyihao 已提交
56 57 58 59 60 61
  } else {
    // refactor later
    assert(0);
  }
  return nRead;
}
dengyihao's avatar
dengyihao 已提交
62
static int writeCtxGetSize(WriterCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
63 64 65 66
  if (ctx->type == TFile) {
    struct stat fstat;
    stat(ctx->file.buf, &fstat);
    return fstat.st_size;
dengyihao's avatar
dengyihao 已提交
67 68 69
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
70
static int writeCtxDoFlush(WriterCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
71
  if (ctx->type == TFile) {
72 73 74
    // taosFsyncFile(ctx->file.pFile);
    taosFsyncFile(ctx->file.pFile);
    // tfFlush(ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
75 76 77 78 79 80
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
81 82
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  WriterCtx* ctx = calloc(1, sizeof(WriterCtx));
dengyihao's avatar
dengyihao 已提交
83
  if (ctx == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
86
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
87
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
88
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
89
    if (readOnly == false) {
90 91 92 93 94 95
      // ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
      ctx->file.pFile = taosOpenFile(path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
      taosFtruncateFile(ctx->file.pFile, 0);
      int64_t file_size;
      taosStatFile(path, &file_size, NULL);
      ctx->file.size = (int)file_size;
dengyihao's avatar
dengyihao 已提交
96
    } else {
97 98
      // ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
      ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
dengyihao's avatar
dengyihao 已提交
99

dengyihao's avatar
dengyihao 已提交
100 101 102
      struct stat fstat;
      stat(path, &fstat);
      ctx->file.size = fstat.st_size;
dengyihao's avatar
dengyihao 已提交
103
#ifdef USE_MMAP
104
      ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
dengyihao's avatar
dengyihao 已提交
105
#endif
106
    }
dengyihao's avatar
dengyihao 已提交
107
    memcpy(ctx->file.buf, path, strlen(path));
108
    if (ctx->file.pFile == NULL) {
dengyihao's avatar
dengyihao 已提交
109
      indexError("failed to open file, error %d", errno);
dengyihao's avatar
dengyihao 已提交
110
      goto END;
dengyihao's avatar
dengyihao 已提交
111
    }
dengyihao's avatar
dengyihao 已提交
112
  } else if (ctx->type == TMemory) {
113 114 115
    ctx->mem.buf = calloc(1, sizeof(char) * capacity);
    ctx->mem.capa = capacity;
  }
dengyihao's avatar
dengyihao 已提交
116
  ctx->write = writeCtxDoWrite;
117
  ctx->read = writeCtxDoRead;
dengyihao's avatar
dengyihao 已提交
118
  ctx->flush = writeCtxDoFlush;
dengyihao's avatar
dengyihao 已提交
119
  ctx->readFrom = writeCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
120
  ctx->size = writeCtxGetSize;
dengyihao's avatar
dengyihao 已提交
121 122

  ctx->offset = 0;
123
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
124 125

  return ctx;
dengyihao's avatar
dengyihao 已提交
126
END:
dengyihao's avatar
dengyihao 已提交
127
  if (ctx->type == TMemory) { free(ctx->mem.buf); }
dengyihao's avatar
dengyihao 已提交
128
  free(ctx);
dengyihao's avatar
dengyihao 已提交
129
  return NULL;
dengyihao's avatar
dengyihao 已提交
130
}
dengyihao's avatar
dengyihao 已提交
131
void writerCtxDestroy(WriterCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
132
  if (ctx->type == TMemory) {
dengyihao's avatar
dengyihao 已提交
133
    free(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
134
  } else {
dengyihao's avatar
dengyihao 已提交
135
    ctx->flush(ctx);
136
    taosCloseFile(&ctx->file.pFile);
dengyihao's avatar
dengyihao 已提交
137 138 139 140 141
    if (ctx->file.readOnly) {
#ifdef USE_MMAP
      munmap(ctx->file.ptr, ctx->file.size);
#endif
    }
dengyihao's avatar
dengyihao 已提交
142 143 144 145 146
    if (ctx->file.readOnly == false) {
      struct stat fstat;
      stat(ctx->file.buf, &fstat);
      // indexError("write file size: %d", (int)(fstat.st_size));
    }
dengyihao's avatar
dengyihao 已提交
147
    if (remove) { unlink(ctx->file.buf); }
dengyihao's avatar
dengyihao 已提交
148 149 150 151
  }
  free(ctx);
}

dengyihao's avatar
dengyihao 已提交
152 153
FstCountingWriter* fstCountingWriterCreate(void* wrt) {
  FstCountingWriter* cw = calloc(1, sizeof(FstCountingWriter));
dengyihao's avatar
dengyihao 已提交
154
  if (cw == NULL) { return NULL; }
155

dengyihao's avatar
dengyihao 已提交
156
  cw->wrt = wrt;
157 158
  //(void *)(writerCtxCreate(TFile, readOnly));
  return cw;
dengyihao's avatar
dengyihao 已提交
159
}
dengyihao's avatar
dengyihao 已提交
160
void fstCountingWriterDestroy(FstCountingWriter* cw) {
161
  // free wrt object: close fd or free mem
dengyihao's avatar
dengyihao 已提交
162
  fstCountingWriterFlush(cw);
163
  // writerCtxDestroy((WriterCtx *)(cw->wrt));
dengyihao's avatar
dengyihao 已提交
164 165 166
  free(cw);
}

dengyihao's avatar
dengyihao 已提交
167
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
168
  if (write == NULL) { return 0; }
169
  // update checksum
dengyihao's avatar
dengyihao 已提交
170
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
171
  WriterCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
172

173
  int nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
174 175
  assert(nWrite == len);
  write->count += len;
dengyihao's avatar
dengyihao 已提交
176 177

  write->summer = taosCalcChecksum(write->summer, buf, len);
178 179
  return len;
}
dengyihao's avatar
dengyihao 已提交
180
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
181
  if (write == NULL) { return 0; }
dengyihao's avatar
dengyihao 已提交
182
  WriterCtx* ctx = write->wrt;
183 184 185
  int        nRead = ctx->read(ctx, buf, len);
  // assert(nRead == len);
  return nRead;
dengyihao's avatar
dengyihao 已提交
186
}
187

dengyihao's avatar
dengyihao 已提交
188 189 190 191
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
  // opt
  return write->summer;
}
dengyihao's avatar
dengyihao 已提交
192 193

int fstCountingWriterFlush(FstCountingWriter* write) {
dengyihao's avatar
dengyihao 已提交
194
  WriterCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
195
  ctx->flush(ctx);
196
  // write->wtr->flush
dengyihao's avatar
dengyihao 已提交
197 198 199
  return 1;
}

dengyihao's avatar
dengyihao 已提交
200
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
201
  assert(1 <= nBytes && nBytes <= 8);
dengyihao's avatar
dengyihao 已提交
202
  uint8_t* buf = calloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
203
  for (uint8_t i = 0; i < nBytes; i++) {
204
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
205 206 207 208
    n = n >> 8;
  }
  fstCountingWriterWrite(writer, buf, nBytes);
  free(buf);
dengyihao's avatar
dengyihao 已提交
209 210
  return;
}
dengyihao's avatar
dengyihao 已提交
211

dengyihao's avatar
dengyihao 已提交
212
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
213 214
  uint8_t nBytes = packSize(n);
  fstCountingWriterPackUintIn(writer, n, nBytes);
215 216
  return nBytes;
}