提交 efb09d7f 编写于 作者: A Alexander Kuzmenkov

Cleanup for #12999. NFC.

上级 1566c8a9
......@@ -51,6 +51,7 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/MemorySanitizer.h>
#include <Common/SymbolIndex.h>
#if !defined(ARCADIA_BUILD)
......@@ -76,6 +77,15 @@ static void call_default_signal_handler(int sig)
raise(sig);
}
const char * msan_strsignal(int sig)
{
// Apparently strsignal is not instrumented by MemorySanitizer, so we
// have to unpoison it to avoid msan reports inside fmt library when we
// print it.
const char * signal_name = strsignal(sig);
__msan_unpoison_string(signal_name);
return signal_name;
}
static constexpr size_t max_query_id_size = 127;
......@@ -275,12 +285,14 @@ private:
if (query_id.empty())
{
LOG_FATAL(log, "(version {}{}, {}) (from thread {}) (no query) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, strsignal(sig), sig);
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info,
thread_num, msan_strsignal(sig), sig);
}
else
{
LOG_FATAL(log, "(version {}{}, {}) (from thread {}) (query_id: {}) Received signal {} ({})",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, query_id, strsignal(sig), sig);
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info,
thread_num, query_id, msan_strsignal(sig), sig);
}
String error_message;
......@@ -831,13 +843,13 @@ void BaseDaemon::handleSignal(int signal_id)
onInterruptSignals(signal_id);
}
else
throw DB::Exception(std::string("Unsupported signal: ") + strsignal(signal_id), 0);
throw DB::Exception(std::string("Unsupported signal: ") + msan_strsignal(signal_id), 0);
}
void BaseDaemon::onInterruptSignals(int signal_id)
{
is_cancelled = true;
LOG_INFO(&logger(), "Received termination signal ({})", strsignal(signal_id));
LOG_INFO(&logger(), "Received termination signal ({})", msan_strsignal(signal_id));
if (sigint_signals_counter >= 2)
{
......
......@@ -8,11 +8,13 @@
#define __msan_unpoison(X, Y)
#define __msan_test_shadow(X, Y) (false)
#define __msan_print_shadow(X, Y)
#define __msan_unpoison_string(X)
#if defined(__has_feature)
# if __has_feature(memory_sanitizer)
# undef __msan_unpoison
# undef __msan_test_shadow
# undef __msan_print_shadow
# undef __msan_unpoison_string
# include <sanitizer/msan_interface.h>
# endif
#endif
......
......@@ -33,7 +33,8 @@ private:
working_buffer = in.buffer();
pos = in.position();
calculateHash(working_buffer.begin(), working_buffer.size());
// `pos` may be different from working_buffer.begin() when using AIO.
calculateHash(pos, working_buffer.end() - pos);
return res;
}
......
......@@ -22,6 +22,8 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_
void PeekableReadBuffer::reset()
{
checkStateCorrect();
peeked_size = 0;
checkpoint = nullptr;
checkpoint_in_own_memory = false;
......@@ -31,6 +33,8 @@ void PeekableReadBuffer::reset()
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
checkStateCorrect();
}
bool PeekableReadBuffer::peekNext()
......@@ -150,7 +154,7 @@ bool PeekableReadBuffer::nextImpl()
/// Switch to reading from sub_buf (or just update it if already switched)
Buffer & sub_working = sub_buf.buffer();
BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset());
working_buffer_offset = sub_buf.offset();
nextimpl_working_buffer_offset = sub_buf.offset();
checkStateCorrect();
return res;
......@@ -159,7 +163,6 @@ bool PeekableReadBuffer::nextImpl()
void PeekableReadBuffer::checkStateCorrect() const
{
#ifndef NDEBUG
if (checkpoint)
{
if (checkpointInOwnMemory())
......@@ -190,7 +193,6 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Pos in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (unread_limit < memory.size())
throw DB::Exception("Size limit exceed", ErrorCodes::LOGICAL_ERROR);
#endif
}
void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
......@@ -245,11 +247,10 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
{
#ifndef NDEBUG
if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
checkStateCorrect();
#endif
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// is't already continuous
......
#pragma once
#include <cassert>
#include <cstring>
#include <algorithm>
#include <memory>
......@@ -41,6 +42,11 @@ public:
*/
ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {}
// Copying the read buffers can be dangerous because they can hold a lot of
// memory or open files, so better to disable the copy constructor to prevent
// accidental copying.
ReadBuffer(const ReadBuffer &) = delete;
// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }
......@@ -54,8 +60,8 @@ public:
if (!res)
working_buffer.resize(0);
pos = working_buffer.begin() + working_buffer_offset;
working_buffer_offset = 0;
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;
return res;
}
......@@ -169,8 +175,10 @@ public:
}
protected:
/// The number of bytes to ignore from the initial position of `working_buffer` buffer.
size_t working_buffer_offset = 0;
/// The number of bytes to ignore from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),
/// not a real field.
size_t nextimpl_working_buffer_offset = 0;
private:
/** Read the next data and fill a buffer with it.
......
......@@ -298,7 +298,7 @@ void ReadBufferAIO::finalize()
first_unread_pos_in_file += bytes_read;
total_bytes_read += bytes_read;
working_buffer_offset = region_left_padding;
nextimpl_working_buffer_offset = region_left_padding;
if (total_bytes_read == max_bytes_read)
is_eof = true;
......
......@@ -32,8 +32,6 @@ public:
ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
ReadBufferFromFile(ReadBufferFromFile &&) = default;
~ReadBufferFromFile() override;
/// Close file before destruction of object.
......
......@@ -17,7 +17,6 @@ class ReadBufferFromFileBase : public BufferWithOwnMemory<SeekableReadBuffer>
public:
ReadBufferFromFileBase();
ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment);
ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default;
~ReadBufferFromFileBase() override;
virtual std::string getFileName() const = 0;
......
......@@ -85,7 +85,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
}
}
pos_in_file += bytes_read;
file_offset_of_buffer_end += bytes_read;
if (bytes_read)
{
......@@ -102,22 +102,35 @@ bool ReadBufferFromFileDescriptor::nextImpl()
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
off_t new_pos;
size_t new_pos;
if (whence == SEEK_SET)
{
assert(offset >= 0);
new_pos = offset;
}
else if (whence == SEEK_CUR)
new_pos = pos_in_file - (working_buffer.end() - pos) + offset;
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
}
else
{
throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == pos_in_file)
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos,
// so the second inequality is strict.
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end)
{
/// Position is still inside buffer.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());
return new_pos;
}
else
......@@ -130,7 +143,7 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
pos_in_file = new_pos;
file_offset_of_buffer_end = new_pos;
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
......
......@@ -14,7 +14,7 @@ class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
int fd;
off_t pos_in_file; /// What offset in file corresponds to working_buffer.end().
size_t file_offset_of_buffer_end; /// What offset in file corresponds to working_buffer.end().
bool nextImpl() override;
......@@ -23,9 +23,7 @@ protected:
public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {}
ReadBufferFromFileDescriptor(ReadBufferFromFileDescriptor &&) = default;
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), file_offset_of_buffer_end(0) {}
int getFD() const
{
......@@ -34,7 +32,7 @@ public:
off_t getPosition() override
{
return pos_in_file - (working_buffer.end() - pos);
return file_offset_of_buffer_end - (working_buffer.end() - pos);
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
......
......@@ -19,7 +19,6 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override;
bool nextImpl() override;
......
......@@ -1100,9 +1100,14 @@ bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current)
return true;
saveUpToPosition(in, memory, current);
bool loaded_more = !in.eof();
assert(in.position() == in.buffer().begin());
// A sanity check. Buffer position may be in the beginning of the buffer
// (normal case), or have some offset from it (AIO).
assert(in.position() >= in.buffer().begin());
assert(in.position() <= in.buffer().end());
current = in.position();
return loaded_more;
}
......
......@@ -697,7 +697,7 @@ void executeQuery(
const char * end;
/// If 'istr' is empty now, fetch next data into buffer.
if (istr.buffer().size() == 0)
if (!istr.hasPendingData())
istr.next();
size_t max_query_size = context.getSettingsRef().max_query_size;
......
......@@ -135,7 +135,13 @@ void writeCommonErrorMessage(
out << ": failed at position " << (last_token.begin - begin + 1);
if (last_token.type == TokenType::EndOfStream || last_token.type == TokenType::Semicolon)
{
out << " (end of query)";
}
else
{
out << " ('" << std::string(last_token.begin, last_token.end - last_token.begin) << "')";
}
/// If query is multiline.
const char * nl = find_first_symbols<'\n'>(begin, end);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册