SegmentWriter.cpp 8.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "segment/SegmentWriter.h"

#include <algorithm>
#include <memory>

#include "SegmentReader.h"
#include "Vectors.h"
#include "codecs/default/DefaultCodec.h"
Y
yudong.cai 已提交
26 27 28
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
29
#include "utils/Log.h"
30
#include "utils/TimeRecorder.h"
31 32 33 34 35

namespace milvus {
namespace segment {

SegmentWriter::SegmentWriter(const std::string& directory) {
Y
yudong.cai 已提交
36 37 38 39
    storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
    storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
    storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
    fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
40 41 42 43 44 45 46 47 48 49 50 51 52
    segment_ptr_ = std::make_shared<Segment>();
}

Status
SegmentWriter::AddVectors(const std::string& name, const std::vector<uint8_t>& data,
                          const std::vector<doc_id_t>& uids) {
    segment_ptr_->vectors_ptr_->AddData(data);
    segment_ptr_->vectors_ptr_->AddUids(uids);
    segment_ptr_->vectors_ptr_->SetName(name);

    return Status::OK();
}

C
Cai Yudong 已提交
53 54 55 56 57 58
Status
SegmentWriter::SetVectorIndex(const milvus::knowhere::VecIndexPtr& index) {
    segment_ptr_->vector_index_ptr_->SetVectorIndex(index);
    return Status::OK();
}

59 60
Status
SegmentWriter::Serialize() {
61
    TimeRecorder recorder("SegmentWriter::Serialize");
Z
Zhiru Zhu 已提交
62

63 64
    auto status = WriteBloomFilter();
    if (!status.ok()) {
65
        LOG_ENGINE_ERROR_ << status.message();
66 67 68
        return status;
    }

69
    recorder.RecordSection("Writing bloom filter done");
Z
Zhiru Zhu 已提交
70

71 72
    status = WriteVectors();
    if (!status.ok()) {
73
        LOG_ENGINE_ERROR_ << "Write vectors fail: " << status.message();
74 75 76
        return status;
    }

77
    recorder.RecordSection("Writing vectors and uids done");
Z
Zhiru Zhu 已提交
78

79 80
    // Write an empty deleted doc
    status = WriteDeletedDocs();
Z
Zhiru Zhu 已提交
81

82
    recorder.RecordSection("Writing deleted docs done");
Z
Zhiru Zhu 已提交
83

84 85 86 87 88 89 90
    return status;
}

Status
SegmentWriter::WriteVectors() {
    codec::DefaultCodec default_codec;
    try {
Y
yudong.cai 已提交
91 92
        fs_ptr_->operation_ptr_->CreateDirectory();
        default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_);
Z
Zhiru Zhu 已提交
93 94
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write vectors: " + std::string(e.what());
95
        LOG_ENGINE_ERROR_ << err_msg;
Z
Zhiru Zhu 已提交
96
        return Status(SERVER_WRITE_ERROR, err_msg);
97 98 99 100
    }
    return Status::OK();
}

C
Cai Yudong 已提交
101
Status
102
SegmentWriter::WriteVectorIndex(const std::string& location) {
C
Cai Yudong 已提交
103 104 105
    codec::DefaultCodec default_codec;
    try {
        fs_ptr_->operation_ptr_->CreateDirectory();
106
        default_codec.GetVectorIndexFormat()->write(fs_ptr_, location, segment_ptr_->vector_index_ptr_);
C
Cai Yudong 已提交
107 108
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write vector index: " + std::string(e.what());
109
        LOG_ENGINE_ERROR_ << err_msg;
C
Cai Yudong 已提交
110 111 112 113 114
        return Status(SERVER_WRITE_ERROR, err_msg);
    }
    return Status::OK();
}

115 116 117 118
Status
SegmentWriter::WriteBloomFilter() {
    codec::DefaultCodec default_codec;
    try {
Y
yudong.cai 已提交
119
        fs_ptr_->operation_ptr_->CreateDirectory();
Z
Zhiru Zhu 已提交
120

121
        TimeRecorder recorder("SegmentWriter::WriteBloomFilter");
Z
Zhiru Zhu 已提交
122

Y
yudong.cai 已提交
123
        default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
Z
Zhiru Zhu 已提交
124

125
        recorder.RecordSection("Initializing bloom filter");
Z
Zhiru Zhu 已提交
126

127 128
        auto& uids = segment_ptr_->vectors_ptr_->GetUids();
        for (auto& uid : uids) {
Z
Zhiru Zhu 已提交
129
            segment_ptr_->id_bloom_filter_ptr_->Add(uid);
130
        }
Z
Zhiru Zhu 已提交
131

132
        recorder.RecordSection("Adding " + std::to_string(uids.size()) + " ids to bloom filter");
Z
Zhiru Zhu 已提交
133

Y
yudong.cai 已提交
134
        default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
Z
Zhiru Zhu 已提交
135

136
        recorder.RecordSection("Writing bloom filter");
Z
Zhiru Zhu 已提交
137 138
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write vectors: " + std::string(e.what());
139
        LOG_ENGINE_ERROR_ << err_msg;
Z
Zhiru Zhu 已提交
140
        return Status(SERVER_WRITE_ERROR, err_msg);
141 142 143 144 145 146 147 148
    }
    return Status::OK();
}

Status
SegmentWriter::WriteDeletedDocs() {
    codec::DefaultCodec default_codec;
    try {
Y
yudong.cai 已提交
149
        fs_ptr_->operation_ptr_->CreateDirectory();
150
        DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
Y
yudong.cai 已提交
151
        default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr);
Z
Zhiru Zhu 已提交
152 153
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
154
        LOG_ENGINE_ERROR_ << err_msg;
Z
Zhiru Zhu 已提交
155
        return Status(SERVER_WRITE_ERROR, err_msg);
156 157 158 159 160 161 162 163
    }
    return Status::OK();
}

Status
SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
    codec::DefaultCodec default_codec;
    try {
Y
yudong.cai 已提交
164 165
        fs_ptr_->operation_ptr_->CreateDirectory();
        default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs);
Z
Zhiru Zhu 已提交
166 167
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
168
        LOG_ENGINE_ERROR_ << err_msg;
Z
Zhiru Zhu 已提交
169
        return Status(SERVER_WRITE_ERROR, err_msg);
170 171 172 173 174 175 176 177
    }
    return Status::OK();
}

Status
SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
    codec::DefaultCodec default_codec;
    try {
Y
yudong.cai 已提交
178 179
        fs_ptr_->operation_ptr_->CreateDirectory();
        default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr);
Z
Zhiru Zhu 已提交
180 181
    } catch (std::exception& e) {
        std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
182
        LOG_ENGINE_ERROR_ << err_msg;
Z
Zhiru Zhu 已提交
183
        return Status(SERVER_WRITE_ERROR, err_msg);
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
    }
    return Status::OK();
}

Status
SegmentWriter::Cache() {
    // TODO(zhiru)
    return Status::OK();
}

Status
SegmentWriter::GetSegment(SegmentPtr& segment_ptr) {
    segment_ptr = segment_ptr_;
    return Status::OK();
}

Status
SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
Y
yudong.cai 已提交
202
    if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) {
203 204 205
        return Status(DB_ERROR, "Cannot Merge Self");
    }

206
    LOG_ENGINE_DEBUG_ << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
207

208
    TimeRecorder recorder("SegmentWriter::Merge");
209 210 211 212 213 214 215 216

    SegmentReader segment_reader_to_merge(dir_to_merge);
    bool in_cache;
    auto status = segment_reader_to_merge.LoadCache(in_cache);
    if (!in_cache) {
        status = segment_reader_to_merge.Load();
        if (!status.ok()) {
            std::string msg = "Failed to load segment from " + dir_to_merge;
217
            LOG_ENGINE_ERROR_ << msg;
218 219 220 221 222 223 224
            return Status(DB_ERROR, msg);
        }
    }
    SegmentPtr segment_to_merge;
    segment_reader_to_merge.GetSegment(segment_to_merge);
    auto& uids = segment_to_merge->vectors_ptr_->GetUids();

225
    recorder.RecordSection("Loading segment");
226 227 228 229 230 231 232 233

    if (segment_to_merge->deleted_docs_ptr_ != nullptr) {
        auto offsets_to_delete = segment_to_merge->deleted_docs_ptr_->GetDeletedDocs();

        // Erase from raw data
        segment_to_merge->vectors_ptr_->Erase(offsets_to_delete);
    }

234
    recorder.RecordSection("erase");
235 236 237

    AddVectors(name, segment_to_merge->vectors_ptr_->GetData(), segment_to_merge->vectors_ptr_->GetUids());

238 239
    auto rows = segment_to_merge->vectors_ptr_->GetCount();
    recorder.RecordSection("Adding " + std::to_string(rows) + " vectors and uids");
240

241
    LOG_ENGINE_DEBUG_ << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
242 243 244 245 246 247 248

    return Status::OK();
}

size_t
SegmentWriter::Size() {
    // TODO(zhiru): switch to actual directory size
249 250
    size_t vectors_size = segment_ptr_->vectors_ptr_->VectorsSize();
    size_t uids_size = segment_ptr_->vectors_ptr_->UidsSize();
Z
Zhiru Zhu 已提交
251
    /*
252 253 254
    if (segment_ptr_->id_bloom_filter_ptr_) {
        ret += segment_ptr_->id_bloom_filter_ptr_->Size();
    }
Z
Zhiru Zhu 已提交
255
     */
256
    return (vectors_size * sizeof(uint8_t) + uids_size * sizeof(doc_id_t));
257 258 259 260 261 262 263 264 265
}

size_t
SegmentWriter::VectorCount() {
    return segment_ptr_->vectors_ptr_->GetCount();
}

}  // namespace segment
}  // namespace milvus