S3ClientWrapper.cpp 10.1 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
C
Cai Yudong 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
C
Cai Yudong 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
C
Cai Yudong 已提交
11 12 13 14 15 16 17 18

#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
C
Cai Yudong 已提交
19
#include <fiu-local.h>
C
Cai Yudong 已提交
20 21 22 23 24
#include <fstream>
#include <iostream>
#include <memory>
#include <utility>

25
#include "config/Config.h"
C
Cai Yudong 已提交
26
#include "storage/s3/S3ClientMock.h"
C
Cai Yudong 已提交
27 28 29 30 31 32 33 34 35 36
#include "storage/s3/S3ClientWrapper.h"
#include "utils/Error.h"
#include "utils/Log.h"

namespace milvus {
namespace storage {

Status
S3ClientWrapper::StartService() {
    server::Config& config = server::Config::GetInstance();
37 38 39 40
    bool s3_enable = false;
    CONFIG_CHECK(config.GetStorageConfigS3Enable(s3_enable));
    fiu_do_on("S3ClientWrapper.StartService.s3_disable", s3_enable = false);
    if (!s3_enable) {
41
        LOG_STORAGE_INFO_ << "S3 not enabled!";
C
Cai Yudong 已提交
42 43 44
        return Status::OK();
    }

45 46 47 48 49
    CONFIG_CHECK(config.GetStorageConfigS3Address(s3_address_));
    CONFIG_CHECK(config.GetStorageConfigS3Port(s3_port_));
    CONFIG_CHECK(config.GetStorageConfigS3AccessKey(s3_access_key_));
    CONFIG_CHECK(config.GetStorageConfigS3SecretKey(s3_secret_key_));
    CONFIG_CHECK(config.GetStorageConfigS3Bucket(s3_bucket_));
C
Cai Yudong 已提交
50 51 52

    Aws::InitAPI(options_);

C
Cai Yudong 已提交
53
    Aws::Client::ClientConfiguration cfg;
54
    cfg.endpointOverride = s3_address_ + ":" + s3_port_;
C
Cai Yudong 已提交
55 56
    cfg.scheme = Aws::Http::Scheme::HTTP;
    cfg.verifySSL = false;
C
Cai Yudong 已提交
57
    client_ptr_ =
58
        std::make_shared<Aws::S3::S3Client>(Aws::Auth::AWSCredentials(s3_access_key_, s3_secret_key_), cfg,
C
Cai Yudong 已提交
59 60 61 62 63 64
                                            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always, false);

    bool mock_enable = false;
    fiu_do_on("S3ClientWrapper.StartService.mock_enable", mock_enable = true);
    if (mock_enable) {
        client_ptr_ = std::make_shared<S3ClientMock>();
C
Cai Yudong 已提交
65 66
    }

67 68 69 70
    std::cout << "S3 service connection check ...... " << std::flush;
    Status stat = CreateBucket();
    std::cout << (stat.ok() ? "OK" : "FAIL") << std::endl;
    return stat;
C
Cai Yudong 已提交
71 72
}

C
Cai Yudong 已提交
73
void
C
Cai Yudong 已提交
74 75 76 77 78 79 80 81 82 83
S3ClientWrapper::StopService() {
    if (client_ptr_ != nullptr) {
        client_ptr_ = nullptr;
    }
    Aws::ShutdownAPI(options_);
}

Status
S3ClientWrapper::CreateBucket() {
    Aws::S3::Model::CreateBucketRequest request;
84
    request.WithBucket(s3_bucket_);
C
Cai Yudong 已提交
85 86 87

    auto outcome = client_ptr_->CreateBucket(request);

C
Cai Yudong 已提交
88
    fiu_do_on("S3ClientWrapper.CreateBucket.outcome.fail", outcome = Aws::S3::Model::CreateBucketOutcome());
C
Cai Yudong 已提交
89 90 91
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
        if (err.GetErrorType() != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) {
92
            LOG_STORAGE_ERROR_ << "ERROR: CreateBucket: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
93 94 95 96
            return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
        }
    }

97
    LOG_STORAGE_DEBUG_ << "CreateBucket '" << s3_bucket_ << "' successfully!";
C
Cai Yudong 已提交
98 99 100 101 102 103
    return Status::OK();
}

Status
S3ClientWrapper::DeleteBucket() {
    Aws::S3::Model::DeleteBucketRequest request;
104
    request.WithBucket(s3_bucket_);
C
Cai Yudong 已提交
105 106 107

    auto outcome = client_ptr_->DeleteBucket(request);

C
Cai Yudong 已提交
108
    fiu_do_on("S3ClientWrapper.DeleteBucket.outcome.fail", outcome = Aws::S3::Model::DeleteBucketOutcome());
C
Cai Yudong 已提交
109 110
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
111
        LOG_STORAGE_ERROR_ << "ERROR: DeleteBucket: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
112 113 114
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

115
    LOG_STORAGE_DEBUG_ << "DeleteBucket '" << s3_bucket_ << "' successfully!";
C
Cai Yudong 已提交
116 117 118 119 120 121 122 123
    return Status::OK();
}

Status
S3ClientWrapper::PutObjectFile(const std::string& object_name, const std::string& file_path) {
    struct stat buffer;
    if (stat(file_path.c_str(), &buffer) != 0) {
        std::string str = "File '" + file_path + "' not exist!";
124
        LOG_STORAGE_ERROR_ << "ERROR: " << str;
C
Cai Yudong 已提交
125 126 127 128
        return Status(SERVER_UNEXPECTED_ERROR, str);
    }

    Aws::S3::Model::PutObjectRequest request;
129
    request.WithBucket(s3_bucket_).WithKey(object_name);
C
Cai Yudong 已提交
130 131 132 133 134 135 136

    auto input_data =
        Aws::MakeShared<Aws::FStream>("PutObjectFile", file_path.c_str(), std::ios_base::in | std::ios_base::binary);
    request.SetBody(input_data);

    auto outcome = client_ptr_->PutObject(request);

C
Cai Yudong 已提交
137
    fiu_do_on("S3ClientWrapper.PutObjectFile.outcome.fail", outcome = Aws::S3::Model::PutObjectOutcome());
C
Cai Yudong 已提交
138 139
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
140
        LOG_STORAGE_ERROR_ << "ERROR: PutObject: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
141 142 143
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

144
    LOG_STORAGE_DEBUG_ << "PutObjectFile '" << file_path << "' successfully!";
C
Cai Yudong 已提交
145 146 147 148 149 150
    return Status::OK();
}

Status
S3ClientWrapper::PutObjectStr(const std::string& object_name, const std::string& content) {
    Aws::S3::Model::PutObjectRequest request;
151
    request.WithBucket(s3_bucket_).WithKey(object_name);
C
Cai Yudong 已提交
152 153 154 155 156 157 158

    const std::shared_ptr<Aws::IOStream> input_data = Aws::MakeShared<Aws::StringStream>("");
    input_data->write(content.data(), content.length());
    request.SetBody(input_data);

    auto outcome = client_ptr_->PutObject(request);

C
Cai Yudong 已提交
159
    fiu_do_on("S3ClientWrapper.PutObjectStr.outcome.fail", outcome = Aws::S3::Model::PutObjectOutcome());
C
Cai Yudong 已提交
160 161
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
162
        LOG_STORAGE_ERROR_ << "ERROR: PutObject: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
163 164 165
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

166
    LOG_STORAGE_DEBUG_ << "PutObjectStr successfully!";
C
Cai Yudong 已提交
167 168 169 170 171 172
    return Status::OK();
}

Status
S3ClientWrapper::GetObjectFile(const std::string& object_name, const std::string& file_path) {
    Aws::S3::Model::GetObjectRequest request;
173
    request.WithBucket(s3_bucket_).WithKey(object_name);
C
Cai Yudong 已提交
174 175 176

    auto outcome = client_ptr_->GetObject(request);

C
Cai Yudong 已提交
177
    fiu_do_on("S3ClientWrapper.GetObjectFile.outcome.fail", outcome = Aws::S3::Model::GetObjectOutcome());
C
Cai Yudong 已提交
178 179
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
180
        LOG_STORAGE_ERROR_ << "ERROR: GetObject: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
181 182 183 184 185 186 187 188
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

    auto& retrieved_file = outcome.GetResultWithOwnership().GetBody();
    std::ofstream output_file(file_path, std::ios::binary);
    output_file << retrieved_file.rdbuf();
    output_file.close();

189
    LOG_STORAGE_DEBUG_ << "GetObjectFile '" << file_path << "' successfully!";
C
Cai Yudong 已提交
190 191 192 193 194 195
    return Status::OK();
}

Status
S3ClientWrapper::GetObjectStr(const std::string& object_name, std::string& content) {
    Aws::S3::Model::GetObjectRequest request;
196
    request.WithBucket(s3_bucket_).WithKey(object_name);
C
Cai Yudong 已提交
197 198 199

    auto outcome = client_ptr_->GetObject(request);

C
Cai Yudong 已提交
200
    fiu_do_on("S3ClientWrapper.GetObjectStr.outcome.fail", outcome = Aws::S3::Model::GetObjectOutcome());
C
Cai Yudong 已提交
201 202
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
203
        LOG_STORAGE_ERROR_ << "ERROR: GetObject: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
204 205 206 207 208 209 210 211
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

    auto& retrieved_file = outcome.GetResultWithOwnership().GetBody();
    std::stringstream ss;
    ss << retrieved_file.rdbuf();
    content = std::move(ss.str());

212
    LOG_STORAGE_DEBUG_ << "GetObjectStr successfully!";
C
Cai Yudong 已提交
213 214 215 216 217 218
    return Status::OK();
}

Status
S3ClientWrapper::ListObjects(std::vector<std::string>& object_list, const std::string& marker) {
    Aws::S3::Model::ListObjectsRequest request;
219
    request.WithBucket(s3_bucket_);
C
Cai Yudong 已提交
220 221 222 223 224 225 226

    if (!marker.empty()) {
        request.WithMarker(marker);
    }

    auto outcome = client_ptr_->ListObjects(request);

C
Cai Yudong 已提交
227
    fiu_do_on("S3ClientWrapper.ListObjects.outcome.fail", outcome = Aws::S3::Model::ListObjectsOutcome());
C
Cai Yudong 已提交
228 229
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
230
        LOG_STORAGE_ERROR_ << "ERROR: ListObjects: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
231 232 233 234 235 236 237 238 239 240
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

    Aws::Vector<Aws::S3::Model::Object> result_list = outcome.GetResult().GetContents();

    for (auto const& s3_object : result_list) {
        object_list.push_back(s3_object.GetKey());
    }

    if (marker.empty()) {
241
        LOG_STORAGE_DEBUG_ << "ListObjects '" << s3_bucket_ << "' successfully!";
C
Cai Yudong 已提交
242
    } else {
243
        LOG_STORAGE_DEBUG_ << "ListObjects '" << s3_bucket_ << ":" << marker << "' successfully!";
C
Cai Yudong 已提交
244 245 246 247 248 249 250
    }
    return Status::OK();
}

Status
S3ClientWrapper::DeleteObject(const std::string& object_name) {
    Aws::S3::Model::DeleteObjectRequest request;
251
    request.WithBucket(s3_bucket_).WithKey(object_name);
C
Cai Yudong 已提交
252 253 254

    auto outcome = client_ptr_->DeleteObject(request);

C
Cai Yudong 已提交
255
    fiu_do_on("S3ClientWrapper.DeleteObject.outcome.fail", outcome = Aws::S3::Model::DeleteObjectOutcome());
C
Cai Yudong 已提交
256 257
    if (!outcome.IsSuccess()) {
        auto err = outcome.GetError();
258
        LOG_STORAGE_ERROR_ << "ERROR: DeleteObject: " << err.GetExceptionName() << ": " << err.GetMessage();
C
Cai Yudong 已提交
259 260 261
        return Status(SERVER_UNEXPECTED_ERROR, err.GetMessage());
    }

262
    LOG_STORAGE_DEBUG_ << "DeleteObject '" << object_name << "' successfully!";
C
Cai Yudong 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    return Status::OK();
}

Status
S3ClientWrapper::DeleteObjects(const std::string& marker) {
    std::vector<std::string> object_list;

    Status stat = ListObjects(object_list, marker);
    if (!stat.ok()) {
        return stat;
    }

    for (std::string& obj_name : object_list) {
        stat = DeleteObject(obj_name);
        if (!stat.ok()) {
            return stat;
        }
    }

    return Status::OK();
}

}  // namespace storage
}  // namespace milvus