index_fst_counting_writer.c 4.6 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_fst_util.h"
18
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
dengyihao's avatar
dengyihao 已提交
21
  if (ctx->offset + len > ctx->limit) { return -1; }
dengyihao's avatar
dengyihao 已提交
22 23

  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
24
    assert(len == tfWrite(ctx->file.fd, buf, len));
dengyihao's avatar
dengyihao 已提交
25
  } else {
26 27
    memcpy(ctx->mem.buf + ctx->offset, buf, len);
  }
dengyihao's avatar
dengyihao 已提交
28 29 30
  ctx->offset += len;
  return len;
}
dengyihao's avatar
dengyihao 已提交
31
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
32
  int nRead = 0;
dengyihao's avatar
dengyihao 已提交
33
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
34
    nRead = tfRead(ctx->file.fd, buf, len);
dengyihao's avatar
dengyihao 已提交
35
  } else {
dengyihao's avatar
dengyihao 已提交
36
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
37
  }
dengyihao's avatar
dengyihao 已提交
38
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
39

dengyihao's avatar
dengyihao 已提交
40
  return nRead;
41
}
dengyihao's avatar
dengyihao 已提交
42 43 44 45 46 47 48 49 50 51 52
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
  int nRead = 0;
  if (ctx->type == TFile) {
    tfLseek(ctx->file.fd, offset, 0);
    nRead = tfRead(ctx->file.fd, buf, len);
  } else {
    // refactor later
    assert(0);
  }
  return nRead;
}
dengyihao's avatar
dengyihao 已提交
53
static int writeCtxDoFlush(WriterCtx* ctx) {
dengyihao's avatar
dengyihao 已提交
54
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
55
    tfFsync(ctx->file.fd);
56
    // tfFlush(ctx->file.fd);
dengyihao's avatar
dengyihao 已提交
57 58 59 60 61 62
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
63 64
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
  WriterCtx* ctx = calloc(1, sizeof(WriterCtx));
dengyihao's avatar
dengyihao 已提交
65
  if (ctx == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
66

dengyihao's avatar
dengyihao 已提交
67
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
68
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
69
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
70
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
71
    if (readOnly == false) {
dengyihao's avatar
dengyihao 已提交
72
      ctx->file.fd = tfOpenCreateWriteAppend(path);
dengyihao's avatar
dengyihao 已提交
73
    } else {
dengyihao's avatar
dengyihao 已提交
74
      ctx->file.fd = tfOpenReadWrite(path);
75
    }
dengyihao's avatar
dengyihao 已提交
76
    memcpy(ctx->file.buf, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
77
    if (ctx->file.fd < 0) {
78
      indexError("open file error %d", errno);
dengyihao's avatar
dengyihao 已提交
79
      goto END;
dengyihao's avatar
dengyihao 已提交
80
    }
dengyihao's avatar
dengyihao 已提交
81
  } else if (ctx->type == TMemory) {
82 83 84
    ctx->mem.buf = calloc(1, sizeof(char) * capacity);
    ctx->mem.capa = capacity;
  }
dengyihao's avatar
dengyihao 已提交
85
  ctx->write = writeCtxDoWrite;
86
  ctx->read = writeCtxDoRead;
dengyihao's avatar
dengyihao 已提交
87
  ctx->flush = writeCtxDoFlush;
dengyihao's avatar
dengyihao 已提交
88
  ctx->readFrom = writeCtxDoReadFrom;
dengyihao's avatar
dengyihao 已提交
89 90

  ctx->offset = 0;
91
  ctx->limit = capacity;
dengyihao's avatar
dengyihao 已提交
92 93

  return ctx;
dengyihao's avatar
dengyihao 已提交
94
END:
dengyihao's avatar
dengyihao 已提交
95
  if (ctx->type == TMemory) { free(ctx->mem.buf); }
dengyihao's avatar
dengyihao 已提交
96
  free(ctx);
dengyihao's avatar
dengyihao 已提交
97
  return NULL;
dengyihao's avatar
dengyihao 已提交
98
}
dengyihao's avatar
dengyihao 已提交
99
void writerCtxDestroy(WriterCtx* ctx, bool remove) {
dengyihao's avatar
dengyihao 已提交
100
  if (ctx->type == TMemory) {
dengyihao's avatar
dengyihao 已提交
101
    free(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
102
  } else {
103
    tfClose(ctx->file.fd);
dengyihao's avatar
dengyihao 已提交
104 105 106 107
    if (remove) {
      indexError("rm file %s", ctx->file.buf);
      unlink(ctx->file.buf);
    }
dengyihao's avatar
dengyihao 已提交
108 109 110 111
  }
  free(ctx);
}

dengyihao's avatar
dengyihao 已提交
112 113
FstCountingWriter* fstCountingWriterCreate(void* wrt) {
  FstCountingWriter* cw = calloc(1, sizeof(FstCountingWriter));
dengyihao's avatar
dengyihao 已提交
114
  if (cw == NULL) { return NULL; }
115

dengyihao's avatar
dengyihao 已提交
116
  cw->wrt = wrt;
117 118
  //(void *)(writerCtxCreate(TFile, readOnly));
  return cw;
dengyihao's avatar
dengyihao 已提交
119
}
dengyihao's avatar
dengyihao 已提交
120
void fstCountingWriterDestroy(FstCountingWriter* cw) {
121
  // free wrt object: close fd or free mem
dengyihao's avatar
dengyihao 已提交
122
  fstCountingWriterFlush(cw);
123
  // writerCtxDestroy((WriterCtx *)(cw->wrt));
dengyihao's avatar
dengyihao 已提交
124 125 126
  free(cw);
}

dengyihao's avatar
dengyihao 已提交
127
int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
128
  if (write == NULL) { return 0; }
129
  // update checksum
dengyihao's avatar
dengyihao 已提交
130
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
131
  WriterCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
132

133
  int nWrite = ctx->write(ctx, buf, len);
dengyihao's avatar
dengyihao 已提交
134 135
  assert(nWrite == len);
  write->count += len;
136 137
  return len;
}
dengyihao's avatar
dengyihao 已提交
138
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
139
  if (write == NULL) { return 0; }
dengyihao's avatar
dengyihao 已提交
140
  WriterCtx* ctx = write->wrt;
141 142 143
  int        nRead = ctx->read(ctx, buf, len);
  // assert(nRead == len);
  return nRead;
dengyihao's avatar
dengyihao 已提交
144
}
145

dengyihao's avatar
dengyihao 已提交
146 147
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
int      fstCountingWriterFlush(FstCountingWriter* write) {
dengyihao's avatar
dengyihao 已提交
148
  WriterCtx* ctx = write->wrt;
dengyihao's avatar
dengyihao 已提交
149
  ctx->flush(ctx);
150
  // write->wtr->flush
dengyihao's avatar
dengyihao 已提交
151 152 153
  return 1;
}

dengyihao's avatar
dengyihao 已提交
154
void fstCountingWriterPackUintIn(FstCountingWriter* writer, uint64_t n, uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
155
  assert(1 <= nBytes && nBytes <= 8);
dengyihao's avatar
dengyihao 已提交
156
  uint8_t* buf = calloc(8, sizeof(uint8_t));
dengyihao's avatar
dengyihao 已提交
157
  for (uint8_t i = 0; i < nBytes; i++) {
158
    buf[i] = (uint8_t)n;
dengyihao's avatar
dengyihao 已提交
159 160 161 162
    n = n >> 8;
  }
  fstCountingWriterWrite(writer, buf, nBytes);
  free(buf);
dengyihao's avatar
dengyihao 已提交
163 164
  return;
}
dengyihao's avatar
dengyihao 已提交
165

dengyihao's avatar
dengyihao 已提交
166
uint8_t fstCountingWriterPackUint(FstCountingWriter* writer, uint64_t n) {
dengyihao's avatar
dengyihao 已提交
167 168
  uint8_t nBytes = packSize(n);
  fstCountingWriterPackUintIn(writer, n, nBytes);
169 170
  return nBytes;
}