DefaultDeletedDocsFormat.cpp 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "codecs/default/DefaultDeletedDocsFormat.h"

Z
Zhiru Zhu 已提交
20 21 22
#include <fcntl.h>
#include <unistd.h>

Z
Zhiru Zhu 已提交
23
#define BOOST_NO_CXX11_SCOPED_ENUMS
24
#include <boost/filesystem.hpp>
Z
Zhiru Zhu 已提交
25
#undef BOOST_NO_CXX11_SCOPED_ENUMS
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
#include <memory>
#include <string>
#include <vector>

#include "segment/Types.h"
#include "utils/Exception.h"
#include "utils/Log.h"

namespace milvus {
namespace codec {

void
DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) {
    const std::lock_guard<std::mutex> lock(mutex_);

    std::string dir_path = directory_ptr->GetDirPath();
    const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
Z
Zhiru Zhu 已提交
43 44 45

    int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
    if (del_fd == -1) {
46 47 48 49 50
        std::string err_msg = "Failed to open file: " + del_file_path;
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
    }

Z
Zhiru Zhu 已提交
51 52 53 54 55 56 57 58
    size_t num_bytes;
    if (::read(del_fd, &num_bytes, sizeof(size_t)) == -1) {
        std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }

    auto deleted_docs_size = num_bytes / sizeof(segment::offset_t);
59 60
    std::vector<segment::offset_t> deleted_docs_list;
    deleted_docs_list.resize(deleted_docs_size);
Z
Zhiru Zhu 已提交
61

Z
Zhiru Zhu 已提交
62
    if (::read(del_fd, deleted_docs_list.data(), num_bytes) == -1) {
Z
Zhiru Zhu 已提交
63 64 65 66 67
        std::string err_msg = "Failed to read from file: " + del_file_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }

68
    deleted_docs = std::make_shared<segment::DeletedDocs>(deleted_docs_list);
Z
Zhiru Zhu 已提交
69 70 71 72 73 74

    if (::close(del_fd) == -1) {
        std::string err_msg = "Failed to close file: " + del_file_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }
75 76 77 78 79 80 81 82
}

void
DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) {
    const std::lock_guard<std::mutex> lock(mutex_);

    std::string dir_path = directory_ptr->GetDirPath();
    const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
Z
Zhiru Zhu 已提交
83

Z
Zhiru Zhu 已提交
84 85 86 87 88 89 90 91 92
    // Create a temporary file from the existing file
    const std::string temp_path = dir_path + "/" + "temp_del";
    bool exists = boost::filesystem::exists(del_file_path);
    if (exists) {
        boost::filesystem::copy_file(del_file_path, temp_path, boost::filesystem::copy_option::fail_if_exists);
    }

    // Write to the temp file, in order to avoid possible race condition with search (concurrent read and write)
    int del_fd = open(temp_path.c_str(), O_RDWR | O_CREAT, 00664);
Z
Zhiru Zhu 已提交
93
    if (del_fd == -1) {
Z
Zhiru Zhu 已提交
94
        std::string err_msg = "Failed to open file: " + temp_path;
95 96 97 98
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
    }

Z
Zhiru Zhu 已提交
99 100 101 102 103 104 105 106 107 108 109
    size_t old_num_bytes;
    if (exists) {
        if (::read(del_fd, &old_num_bytes, sizeof(size_t)) == -1) {
            std::string err_msg = "Failed to read from file: " + temp_path + ", error: " + std::strerror(errno);
            ENGINE_LOG_ERROR << err_msg;
            throw Exception(SERVER_WRITE_ERROR, err_msg);
        }
    } else {
        old_num_bytes = 0;
    }

110
    auto deleted_docs_list = deleted_docs->GetDeletedDocs();
Z
Zhiru Zhu 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123 124
    size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetSize();

    // rewind and overwrite with the new_num_bytes
    int off = lseek(del_fd, 0, SEEK_SET);
    if (off == -1) {
        std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }
    if (::write(del_fd, &new_num_bytes, sizeof(size_t)) == -1) {
        std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }
Z
Zhiru Zhu 已提交
125

Z
Zhiru Zhu 已提交
126 127 128 129 130 131 132
    // Move to the end of file and append
    off = lseek(del_fd, 0, SEEK_END);
    if (off == -1) {
        std::string err_msg = "Failed to seek file: " + temp_path + ", error: " + std::strerror(errno);
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }
Z
update  
Zhiru Zhu 已提交
133
    if (::write(del_fd, deleted_docs_list.data(), sizeof(segment::offset_t) * deleted_docs->GetSize()) == -1) {
Z
Zhiru Zhu 已提交
134
        std::string err_msg = "Failed to write to file" + temp_path + ", error: " + std::strerror(errno);
Z
Zhiru Zhu 已提交
135 136 137 138 139
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }

    if (::close(del_fd) == -1) {
Z
Zhiru Zhu 已提交
140
        std::string err_msg = "Failed to close file: " + temp_path + ", error: " + std::strerror(errno);
Z
Zhiru Zhu 已提交
141 142 143
        ENGINE_LOG_ERROR << err_msg;
        throw Exception(SERVER_WRITE_ERROR, err_msg);
    }
Z
Zhiru Zhu 已提交
144 145 146

    // Move temp file to delete file
    boost::filesystem::rename(temp_path, del_file_path);
147 148 149 150
}

}  // namespace codec
}  // namespace milvus