提交 26021775 编写于 作者: A Amos Bird

Better locking for StorageBuffer

上级 9fa0c33b
......@@ -277,7 +277,7 @@ public:
for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
{
std::unique_lock<std::mutex> lock(storage.buffers[shard_num].mutex, std::try_to_lock_t());
std::unique_lock<std::mutex> lock(storage.buffers[shard_num].mutex, std::try_to_lock);
if (lock.owns_lock())
{
......@@ -295,14 +295,16 @@ public:
/// If you still can not lock anything at once, then we'll wait on mutex.
if (!least_busy_buffer)
insertIntoBuffer(block, storage.buffers[start_shard_num], std::unique_lock<std::mutex>(storage.buffers[start_shard_num].mutex));
else
insertIntoBuffer(block, *least_busy_buffer, std::move(least_busy_lock));
{
least_busy_buffer = &storage.buffers[start_shard_num];
least_busy_lock = std::unique_lock<std::mutex>(least_busy_buffer->mutex);
}
insertIntoBuffer(block, *least_busy_buffer);
}
private:
StorageBuffer & storage;
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
{
time_t current_time = time(nullptr);
......@@ -320,9 +322,7 @@ private:
* an exception will be thrown, and new data will not be added to the buffer.
*/
lock.unlock();
storage.flushBuffer(buffer, true);
lock.lock();
storage.flushBuffer(buffer, true, true /* locked */);
}
if (!buffer.first_write_time)
......@@ -459,7 +459,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
}
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
{
Block block_to_write;
time_t current_time = time(nullptr);
......@@ -468,7 +468,9 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
size_t bytes = 0;
time_t time_passed = 0;
std::lock_guard<std::mutex> lock(buffer.mutex);
std::unique_lock<std::mutex> lock(buffer.mutex, std::defer_lock);
if (!locked)
lock.lock();
block_to_write = buffer.data.cloneEmpty();
......
......@@ -114,7 +114,7 @@ private:
void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
void flushBuffer(Buffer & buffer, bool check_thresholds);
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册