// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once namespace milvus { namespace server { template void BlockingQueue::Put(const T &task) { std::unique_lock lock(mtx); full_.wait(lock, [this] { return (queue_.size() < capacity_); }); queue_.push(task); empty_.notify_all(); } template T BlockingQueue::Take() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); T front(queue_.front()); queue_.pop(); full_.notify_all(); return front; } template size_t BlockingQueue::Size() { std::lock_guard lock(mtx); return queue_.size(); } template T BlockingQueue::Front() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); T front(queue_.front()); return front; } template T BlockingQueue::Back() { std::unique_lock lock(mtx); empty_.wait(lock, [this] { return !queue_.empty(); }); T back(queue_.back()); return back; } template bool BlockingQueue::Empty() { std::unique_lock lock(mtx); return queue_.empty(); } template void BlockingQueue::SetCapacity(const size_t capacity) { capacity_ = (capacity > 0 ? capacity : capacity_); } } // namespace server } // namespace milvus