index_fst_counting_writer.c 3.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
dengyihao's avatar
dengyihao 已提交
15
#include "tutil.h"
dengyihao's avatar
dengyihao 已提交
16
#include "index_fst_util.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_fst_counting_writer.h"
dengyihao's avatar
dengyihao 已提交
18

dengyihao's avatar
dengyihao 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
  if (ctx->offset + len > ctx->limit) {
    return -1;
  }

  if (ctx->type == TFile) {
    assert(len != tfWrite(ctx->fd, buf, len));
  } else {
    memcpy(ctx->mem + ctx->offset, buf, len);
  } 
  ctx->offset += len;
  return len;
}
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
  if (ctx->type == TFile) {
    tfRead(ctx->fd, buf, len);
  } else {
    memcpy(buf, ctx->mem + ctx->offset, len);
  }
  ctx->offset += len;

  return 1;
} 
static int writeCtxDoFlush(WriterCtx *ctx) {
  if (ctx->type == TFile) {
    //tfFlush(ctx->fd);
  } else {
    // do nothing
  }
  return 1;
}

WriterCtx* writerCtxCreate(WriterType type) {     
  WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
  if (ctx == NULL) { return NULL; }

  ctx->type == type;
  if (ctx->type == TFile) {
    ctx->fd = tfOpenCreateWriteAppend(tmpFile);  
  } else if (ctx->type == TMemory) {
    ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t));
  } 
  ctx->write = writeCtxDoWrite;
  ctx->read  = writeCtxDoRead;
  ctx->flush = writeCtxDoFlush;

  ctx->offset = 0;
  ctx->limit  = DefaultMem;

  return ctx;
}
void writerCtxDestroy(WriterCtx *ctx) {
  if (ctx->type == TMemory) {
    free(ctx->mem);
  } else {
    tfClose(ctx->fd);    
  }
  free(ctx);
}


dengyihao's avatar
dengyihao 已提交
80 81 82
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
  FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); 
  if (cw == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
83
  
dengyihao's avatar
dengyihao 已提交
84
  cw->wrt = (void *)(writerCtxCreate(TFile)); 
dengyihao's avatar
dengyihao 已提交
85 86
  return cw; 
}
dengyihao's avatar
dengyihao 已提交
87
void fstCountingWriterDestroy(FstCountingWriter *cw) {
dengyihao's avatar
dengyihao 已提交
88
  // free wrt object: close fd or free mem 
dengyihao's avatar
dengyihao 已提交
89
  writerCtxDestroy((WriterCtx *)(cw->wrt));
dengyihao's avatar
dengyihao 已提交
90 91 92
  free(cw);
}

dengyihao's avatar
dengyihao 已提交
93
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) {
dengyihao's avatar
dengyihao 已提交
94 95 96
  if (write == NULL) { return 0; } 
  // update checksum 
  // write data to file/socket or mem
dengyihao's avatar
dengyihao 已提交
97 98 99 100
  WriterCtx *ctx = write->wrt;

  int nWrite = ctx->write(ctx, buf, bufLen); 
  write->count += nWrite;
dengyihao's avatar
dengyihao 已提交
101 102 103
  return bufLen; 
} 

dengyihao's avatar
dengyihao 已提交
104 105 106
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
  return 0;
}
dengyihao's avatar
dengyihao 已提交
107
int fstCountingWriterFlush(FstCountingWriter *write) {
dengyihao's avatar
dengyihao 已提交
108 109
  WriterCtx *ctx = write->wrt;
  ctx->flush(ctx);
dengyihao's avatar
dengyihao 已提交
110 111 112 113
  //write->wtr->flush
  return 1;
}

dengyihao's avatar
dengyihao 已提交
114
void fstCountingWriterPackUintIn(FstCountingWriter *writer, uint64_t n,  uint8_t nBytes) {
dengyihao's avatar
dengyihao 已提交
115 116 117 118 119 120 121 122
  assert(1 <= nBytes && nBytes <= 8);
  uint8_t *buf = calloc(8, sizeof(uint8_t));  
  for (uint8_t i = 0; i < nBytes; i++) {
    buf[i] = (uint8_t)n; 
    n = n >> 8;
  }
  fstCountingWriterWrite(writer, buf, nBytes);
  free(buf);
dengyihao's avatar
dengyihao 已提交
123 124
  return;
}
dengyihao's avatar
dengyihao 已提交
125

dengyihao's avatar
dengyihao 已提交
126 127 128 129 130 131
uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n) {
  uint8_t nBytes = packSize(n);
  fstCountingWriterPackUintIn(writer, n, nBytes);
  return nBytes; 
} 

dengyihao's avatar
dengyihao 已提交
132