index_fst_counting_writer.c 4.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
dengyihao's avatar
dengyihao 已提交
15
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
16
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_fst_util.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
19

dengyihao's avatar
dengyihao 已提交
20 21 22 23 24 25
static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
  if (ctx->offset + len > ctx->limit) {
    return -1;
  }

  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
26
    assert(len == tfWrite(ctx->file.fd, buf, len));
dengyihao's avatar
dengyihao 已提交
27
  } else {
dengyihao's avatar
dengyihao 已提交
28
    memcpy(ctx->mem.buf+ ctx->offset, buf, len);
dengyihao's avatar
dengyihao 已提交
29 30 31 32 33
  } 
  ctx->offset += len;
  return len;
}
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
dengyihao's avatar
dengyihao 已提交
34
  int nRead = 0; 
dengyihao's avatar
dengyihao 已提交
35
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
36
    nRead = tfRead(ctx->file.fd, buf, len);
dengyihao's avatar
dengyihao 已提交
37
  } else {
dengyihao's avatar
dengyihao 已提交
38
    memcpy(buf, ctx->mem.buf + ctx->offset, len);
dengyihao's avatar
dengyihao 已提交
39
  }
dengyihao's avatar
dengyihao 已提交
40
  ctx->offset += nRead;
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
  return nRead;
dengyihao's avatar
dengyihao 已提交
43 44 45
} 
static int writeCtxDoFlush(WriterCtx *ctx) {
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
46
    //tfFsync(ctx->fd);
dengyihao's avatar
dengyihao 已提交
47
    //tfFlush(ctx->file.fd);
dengyihao's avatar
dengyihao 已提交
48 49 50 51 52 53
  } else {
    // do nothing
  }
  return 1;
}

dengyihao's avatar
dengyihao 已提交
54
WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) {     
dengyihao's avatar
dengyihao 已提交
55 56 57
  WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
  if (ctx == NULL) { return NULL; }

dengyihao's avatar
dengyihao 已提交
58
  ctx->type = type;
dengyihao's avatar
dengyihao 已提交
59
  if (ctx->type == TFile) {
dengyihao's avatar
dengyihao 已提交
60
    // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
61
    ctx->file.readOnly = readOnly;
dengyihao's avatar
dengyihao 已提交
62
    if (readOnly == false) {
dengyihao's avatar
dengyihao 已提交
63
      ctx->file.fd = tfOpenCreateWriteAppend(tmpFile);  
dengyihao's avatar
dengyihao 已提交
64
    } else {
dengyihao's avatar
dengyihao 已提交
65
      ctx->file.fd = tfOpenReadWrite(tmpFile);
dengyihao's avatar
dengyihao 已提交
66
    } 
dengyihao's avatar
dengyihao 已提交
67
    if (ctx->file.fd < 0) {
dengyihao's avatar
dengyihao 已提交
68
      indexError("open file error %d", errno);       
dengyihao's avatar
dengyihao 已提交
69
    }
dengyihao's avatar
dengyihao 已提交
70
  } else if (ctx->type == TMemory) {
dengyihao's avatar
dengyihao 已提交
71 72
    ctx->mem.buf  = calloc(1, sizeof(char) * capacity); 
    ctx->mem.capa = capacity; 
dengyihao's avatar
dengyihao 已提交
73 74 75 76 77 78
  } 
  ctx->write = writeCtxDoWrite;
  ctx->read  = writeCtxDoRead;
  ctx->flush = writeCtxDoFlush;

  ctx->offset = 0;
dengyihao's avatar
dengyihao 已提交
79
  ctx->limit  = capacity;
dengyihao's avatar
dengyihao 已提交
80 81 82 83 84

  return ctx;
}
void writerCtxDestroy(WriterCtx *ctx) {
  if (ctx->type == TMemory) {
dengyihao's avatar
dengyihao 已提交
85
    free(ctx->mem.buf);
dengyihao's avatar
dengyihao 已提交
86
  } else {
dengyihao's avatar
dengyihao 已提交
87
    tfClose(ctx->file.fd);    
dengyihao's avatar
dengyihao 已提交
88 89 90 91 92
  }
  free(ctx);
}


dengyihao's avatar
dengyihao 已提交
93
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
dengyihao's avatar
dengyihao 已提交
94 95
  FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); 
  if (cw == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
96
  
dengyihao's avatar
dengyihao 已提交
97 98
  cw->wrt = wrt;
  //(void *)(writerCtxCreate(TFile, readOnly)); 
dengyihao's avatar
dengyihao 已提交
99 100
  return cw; 
}
dengyihao's avatar
dengyihao 已提交
101
void fstCountingWriterDestroy(FstCountingWriter *cw) {
dengyihao's avatar
dengyihao 已提交
102
  // free wrt object: close fd or free mem 
dengyihao's avatar
dengyihao 已提交
103
  fstCountingWriterFlush(cw);
dengyihao's avatar
dengyihao 已提交
104
  //writerCtxDestroy((WriterCtx *)(cw->wrt));
dengyihao's avatar
dengyihao 已提交
105 106 107
  free(cw);
}

dengyihao's avatar
dengyihao 已提交
108
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) {
dengyihao's avatar
dengyihao 已提交
109 110 111
  if (write == NULL) { return 0; } 
  // update checksum 
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
112 113
  WriterCtx *ctx = write->wrt;

dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120 121 122 123 124
  int nWrite = ctx->write(ctx, buf, len); 
  assert(nWrite == len);
  write->count += len;
  return len; 
} 
int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) {
  if (write == NULL) { return 0; }
  WriterCtx *ctx = write->wrt;
  int nRead = ctx->read(ctx, buf, len);
  //assert(nRead == len);
  return nRead; 
dengyihao's avatar
dengyihao 已提交
125 126
} 

dengyihao's avatar
dengyihao 已提交
127
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
dengyihao's avatar
dengyihao 已提交
128
  
dengyihao's avatar
dengyihao 已提交
129 130
  return 0;
}
dengyihao's avatar
dengyihao 已提交
131
int fstCountingWriterFlush(FstCountingWriter *write) {
dengyihao's avatar
dengyihao 已提交
132 133
  WriterCtx *ctx = write->wrt;
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
134 135 136 137
  //write->wtr->flush
  return 1;
}

dengyihao's avatar
dengyihao 已提交
138
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n,  uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146
  assert(1 <= nBytes && nBytes <= 8);
  uint8_t *buf = calloc(8, sizeof(uint8_t));  
  for (uint8_t i = 0; i < nBytes; i++) {
    buf[i] = (uint8_t)n; 
    n = n >> 8;
  }
  fstCountingWriterWrite(writer, buf, nBytes);
  free(buf);
dengyihao's avatar
dengyihao 已提交
147 148
  return;
}
dengyihao's avatar
dengyihao 已提交
149

dengyihao's avatar
dengyihao 已提交
150 151 152 153 154 155
uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n) {
  uint8_t nBytes = packSize(n);
  fstCountingWriterPackUintIn(writer, n, nBytes);
  return nBytes; 
} 

dengyihao's avatar
dengyihao 已提交
156