s3downloader.h 3.8 KB
Newer Older
1 2 3 4 5
#ifndef __S3DOWNLOADER_H__
#define __S3DOWNLOADER_H__

#include <fcntl.h>
#include <pthread.h>
6 7 8
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
9

10 11
#include <cstddef>
#include <cstdlib>
12 13 14 15 16 17 18 19 20
#include <iostream>
#include <map>
#include <string>
#include <vector>
// #include <cstdint>
#include <cstring>

#include <curl/curl.h>

21 22 23
#include "s3common.h"

using std::vector;
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

struct Range {
    /* data */
    uint64_t offset;
    uint64_t len;
};

class OffsetMgr {
   public:
    OffsetMgr(uint64_t maxsize, uint64_t chunksize);
    ~OffsetMgr() { pthread_mutex_destroy(&this->offset_lock); };
    Range NextOffset();  // ret.len == 0 means EOF
    void Reset(uint64_t n);
    uint64_t Chunksize() { return this->chunksize; };
    uint64_t Size() { return this->maxsize; };

   private:
    pthread_mutex_t offset_lock;
    uint64_t maxsize;
    uint64_t chunksize;
    uint64_t curpos;
};

class BlockingBuffer {
   public:
A
Adam Lee 已提交
49
    static BlockingBuffer* CreateBuffer(string url, string region, OffsetMgr* o,
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
                                        S3Credential* pcred);
    BlockingBuffer(string url, OffsetMgr* o);
    virtual ~BlockingBuffer();
    bool Init();
    bool EndOfFile() { return this->eof; };
    bool Error() { return this->error; };

    uint64_t Read(char* buf, uint64_t len);
    uint64_t Fill();

    static const int STATUS_EMPTY = 0;
    static const int STATUS_READY = 1;

    /* data */
   protected:
    string sourceurl;
    uint64_t bufcap;
    virtual uint64_t fetchdata(uint64_t offset, char* data, uint64_t len) = 0;

   private:
    int status;
    bool eof;
    bool error;
    pthread_mutex_t stat_mutex;
    pthread_cond_t stat_cond;
    uint64_t readpos;
    uint64_t realsize;
    char* bufferdata;
    OffsetMgr* mgr;
    Range nextpos;
};

struct Downloader {
    Downloader(uint8_t part_num);
    ~Downloader();
A
Adam Lee 已提交
85
    bool init(string url, string region, uint64_t size, uint64_t chunksize,
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
              S3Credential* pcred);
    bool get(char* buf, uint64_t& len);
    void destroy();
    // reset
    // init(url)
   private:
    const uint8_t num;
    pthread_t* threads;
    BlockingBuffer** buffers;
    OffsetMgr* o;
    uint64_t chunkcount;
    uint64_t readlen;
};

struct Bufinfo {
    /* data */
    char* buf;
    uint64_t maxsize;
    uint64_t len;
};

class HTTPFetcher : public BlockingBuffer {
   public:
    HTTPFetcher(string url, OffsetMgr* o);
    ~HTTPFetcher();
    bool SetMethod(Method m);
    bool AddHeaderField(HeaderField f, string v);

   protected:
    uint64_t fetchdata(uint64_t offset, char* data, uint64_t len);
    virtual bool processheader() { return true; };
    CURL* curl;
    Method method;
    HeaderContent headers;
    UrlParser urlparser;
};

class S3Fetcher : public HTTPFetcher {
   public:
A
Adam Lee 已提交
125 126
    S3Fetcher(string url, string region, OffsetMgr* o,
              const S3Credential& cred);
127 128 129 130 131 132
    ~S3Fetcher(){};

   protected:
    virtual bool processheader();

   private:
A
Adam Lee 已提交
133
    string region;
134 135 136 137 138 139 140 141 142 143
    S3Credential cred;
};

struct BucketContent;

struct ListBucketResult {
    string Name;
    string Prefix;
    unsigned int MaxKeys;
    vector<BucketContent*> contents;
144 145

    ~ListBucketResult();
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
};

BucketContent* CreateBucketContentItem(string key, uint64_t size);

struct BucketContent {
    friend BucketContent* CreateBucketContentItem(string key, uint64_t size);
    BucketContent();
    ~BucketContent();
    string Key() const { return this->key; };
    uint64_t Size() const { return this->size; };

   private:
    // BucketContent(const BucketContent& b) = delete;
    // BucketContent operator=(const BucketContent& b) = delete;

    string key;
    // const char* etags;
    uint64_t size;
};

// need free
A
Adam Lee 已提交
167 168
ListBucketResult* ListBucket(string schema, string region, string bucket,
                             string prefix, const S3Credential& cred);
169 170 171 172

ListBucketResult* ListBucket_FakeHTTP(string host, string bucket);

#endif