提交 77b7158c 编写于 作者: G Guillaume Tassery

Progress rename for total_rows and write_[rows|bytes]

上级 9c295fa5
......@@ -325,8 +325,8 @@ private:
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
info_per_interval.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
info_total.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
}
......
......@@ -13,7 +13,7 @@ void checkFulfilledConditionsAndUpdate(
TestStats & statistics, TestStopConditions & stop_conditions,
InterruptListener & interrupt_listener)
{
statistics.add(progress.rows, progress.bytes);
statistics.add(progress.read_rows, progress.read_bytes);
stop_conditions.reportRowsRead(statistics.total_rows_read);
stop_conditions.reportBytesReadUncompressed(statistics.total_bytes_read);
......
......@@ -19,8 +19,8 @@ void CountingBlockOutputStream::write(const Block & block)
Progress local_progress(block.rows(), block.bytes(), 0);
progress.incrementPiecewiseAtomically(local_progress);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.bytes);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.read_rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.read_bytes);
if (process_elem)
process_elem->updateProgressOut(local_progress);
......
......@@ -281,7 +281,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.rows, progress.total_rows);
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
......@@ -289,7 +289,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
if (limits.mode == LIMITS_TOTAL
&& ((limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes)))
|| (limits.size_limits.max_bytes && progress.read_bytes > limits.size_limits.max_bytes)))
{
switch (limits.size_limits.overflow_mode)
{
......@@ -300,7 +300,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.read_bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
}
......@@ -308,8 +308,8 @@ void IBlockInputStream::progressImpl(const Progress & value)
case OverflowMode::BREAK:
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
if ((limits.size_limits.max_rows && progress.read_rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.read_bytes > limits.size_limits.max_bytes))
{
cancel(false);
}
......@@ -322,7 +322,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
}
}
size_t total_rows = progress.total_rows;
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
......@@ -344,20 +344,20 @@ void IBlockInputStream::progressImpl(const Progress & value)
if (elapsed_seconds > 0)
{
if (limits.min_execution_speed && progress.rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / elapsed_seconds)
if (limits.min_execution_speed && progress.read_rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.read_rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
if (limits.min_execution_speed_bytes && progress.bytes / elapsed_seconds < limits.min_execution_speed_bytes)
throw Exception("Query is executing too slow: " + toString(progress.bytes / elapsed_seconds)
if (limits.min_execution_speed_bytes && progress.read_bytes / elapsed_seconds < limits.min_execution_speed_bytes)
throw Exception("Query is executing too slow: " + toString(progress.read_bytes / elapsed_seconds)
+ " bytes/sec., minimum: " + toString(limits.min_execution_speed_bytes),
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.rows);
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.read_rows);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
......@@ -366,17 +366,17 @@ void IBlockInputStream::progressImpl(const Progress & value)
ErrorCodes::TOO_SLOW);
}
if (limits.max_execution_speed && progress.rows / elapsed_seconds >= limits.max_execution_speed)
limitProgressingSpeed(progress.rows, limits.max_execution_speed, total_elapsed_microseconds);
if (limits.max_execution_speed && progress.read_rows / elapsed_seconds >= limits.max_execution_speed)
limitProgressingSpeed(progress.read_rows, limits.max_execution_speed, total_elapsed_microseconds);
if (limits.max_execution_speed_bytes && progress.bytes / elapsed_seconds >= limits.max_execution_speed_bytes)
limitProgressingSpeed(progress.bytes, limits.max_execution_speed_bytes, total_elapsed_microseconds);
if (limits.max_execution_speed_bytes && progress.read_bytes / elapsed_seconds >= limits.max_execution_speed_bytes)
limitProgressingSpeed(progress.read_bytes, limits.max_execution_speed_bytes, total_elapsed_microseconds);
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(nullptr), value.rows, value.bytes);
quota->checkAndAddReadRowsBytes(time(nullptr), value.read_rows, value.read_bytes);
}
}
}
......
......@@ -220,10 +220,10 @@ void JSONRowOutputStream::writeStatistics()
writeText(watch.elapsedSeconds(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"rows_read\": ", *ostr);
writeText(progress.rows.load(), *ostr);
writeText(progress.read_rows.load(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"bytes_read\": ", *ostr);
writeText(progress.bytes.load(), *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
......
......@@ -215,10 +215,10 @@ void XMLRowOutputStream::writeStatistics()
writeText(watch.elapsedSeconds(), *ostr);
writeCString("</elapsed>\n", *ostr);
writeCString("\t\t<rows_read>", *ostr);
writeText(progress.rows.load(), *ostr);
writeText(progress.read_rows.load(), *ostr);
writeCString("</rows_read>\n", *ostr);
writeCString("\t\t<bytes_read>", *ostr);
writeText(progress.bytes.load(), *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeCString("</bytes_read>\n", *ostr);
writeCString("\t</statistics>\n", *ostr);
}
......
......@@ -10,38 +10,38 @@ namespace DB
{
void ProgressValues::read(ReadBuffer & in, UInt64 server_revision)
{
size_t new_rows = 0;
size_t new_bytes = 0;
size_t new_total_rows = 0;
size_t new_write_rows = 0;
size_t new_write_bytes = 0;
size_t new_read_rows = 0;
size_t new_read_bytes = 0;
size_t new_total_rows_to_read = 0;
size_t new_written_rows = 0;
size_t new_written_bytes = 0;
readVarUInt(new_rows, in);
readVarUInt(new_bytes, in);
readVarUInt(new_total_rows, in);
readVarUInt(new_read_rows, in);
readVarUInt(new_read_bytes, in);
readVarUInt(new_total_rows_to_read, in);
if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
readVarUInt(new_write_rows, in);
readVarUInt(new_write_bytes, in);
readVarUInt(new_written_rows, in);
readVarUInt(new_written_bytes, in);
}
this->rows = new_rows;
this->bytes = new_bytes;
this->total_rows = new_total_rows;
this->write_rows = new_write_rows;
this->write_bytes = new_write_bytes;
this->read_rows = new_read_rows;
this->read_bytes = new_read_bytes;
this->total_rows_to_read = new_total_rows_to_read;
this->written_rows = new_written_rows;
this->written_bytes = new_written_bytes;
}
void ProgressValues::write(WriteBuffer & out, UInt64 client_revision) const
{
writeVarUInt(this->rows, out);
writeVarUInt(this->bytes, out);
writeVarUInt(this->total_rows, out);
writeVarUInt(this->read_rows, out);
writeVarUInt(this->read_bytes, out);
writeVarUInt(this->total_rows_to_read, out);
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
writeVarUInt(this->write_rows, out);
writeVarUInt(this->write_bytes, out);
writeVarUInt(this->written_rows, out);
writeVarUInt(this->written_bytes, out);
}
}
......@@ -51,15 +51,15 @@ void ProgressValues::writeJSON(WriteBuffer & out) const
/// of 64-bit integers after interpretation by JavaScript.
writeCString("{\"read_rows\":\"", out);
writeText(this->rows, out);
writeText(this->read_rows, out);
writeCString("\",\"read_bytes\":\"", out);
writeText(this->bytes, out);
writeText(this->read_bytes, out);
writeCString("\",\"written_rows\":\"", out);
writeText(this->write_rows, out);
writeText(this->written_rows, out);
writeCString("\",\"written_bytes\":\"", out);
writeText(this->write_bytes, out);
writeCString("\",\"rows_in_set\":\"", out);
writeText(this->total_rows, out);
writeText(this->written_bytes, out);
writeCString("\",\"total_rows_to_read\":\"", out);
writeText(this->total_rows_to_read, out);
writeCString("\"}", out);
}
......@@ -68,11 +68,11 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision)
ProgressValues values;
values.read(in, server_revision);
rows.store(values.rows, std::memory_order_relaxed);
bytes.store(values.bytes, std::memory_order_relaxed);
total_rows.store(values.total_rows, std::memory_order_relaxed);
write_rows.store(values.write_rows, std::memory_order_relaxed);
write_bytes.store(values.write_bytes, std::memory_order_relaxed);
read_rows.store(values.read_rows, std::memory_order_relaxed);
read_bytes.store(values.read_bytes, std::memory_order_relaxed);
total_rows_to_read.store(values.total_rows_to_read, std::memory_order_relaxed);
written_rows.store(values.written_rows, std::memory_order_relaxed);
written_bytes.store(values.written_bytes, std::memory_order_relaxed);
}
void Progress::write(WriteBuffer & out, UInt64 client_revision) const
......
......@@ -15,11 +15,11 @@ class WriteBuffer;
/// See Progress.
struct ProgressValues
{
size_t rows;
size_t bytes;
size_t total_rows;
size_t write_rows;
size_t write_bytes;
size_t read_rows;
size_t read_bytes;
size_t total_rows_to_read;
size_t written_rows;
size_t written_bytes;
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
......@@ -28,21 +28,21 @@ struct ProgressValues
struct ReadProgress
{
size_t rows;
size_t bytes;
size_t total_rows;
size_t read_rows;
size_t read_bytes;
size_t total_rows_to_read;
ReadProgress(size_t rows_, size_t bytes_, size_t total_rows_ = 0)
: rows(rows_), bytes(bytes_), total_rows(total_rows_) {}
ReadProgress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
};
struct WriteProgress
{
size_t write_rows;
size_t write_bytes;
size_t written_rows;
size_t written_bytes;
WriteProgress(size_t write_rows_, size_t write_bytes_)
: write_rows(write_rows_), write_bytes(write_bytes_) {}
WriteProgress(size_t written_rows_, size_t written_bytes_)
: written_rows(written_rows_), written_bytes(written_bytes_) {}
};
/** Progress of query execution.
......@@ -51,26 +51,26 @@ struct WriteProgress
*/
struct Progress
{
std::atomic<size_t> rows {0}; /// Rows (source) processed.
std::atomic<size_t> bytes {0}; /// Bytes (uncompressed, source) processed.
std::atomic<size_t> read_rows {0}; /// Rows (source) processed.
std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed.
/** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job.
* Received values must be summed to get estimate of total rows to process.
* Used for rendering progress bar on client.
*/
std::atomic<size_t> total_rows {0};
std::atomic<size_t> total_rows_to_read {0};
std::atomic<size_t> write_rows {0};
std::atomic<size_t> write_bytes {0};
std::atomic<size_t> written_rows {0};
std::atomic<size_t> written_bytes {0};
Progress() {}
Progress(size_t rows_, size_t bytes_, size_t total_rows_ = 0)
: rows(rows_), bytes(bytes_), total_rows(total_rows_) {}
Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
Progress(ReadProgress read_progress)
: rows(read_progress.rows), bytes(read_progress.bytes), total_rows(read_progress.total_rows) {}
: read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
Progress(WriteProgress write_progress)
: write_rows(write_progress.write_rows), write_bytes(write_progress.write_bytes) {}
: written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
......@@ -80,33 +80,33 @@ struct Progress
/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs)
{
rows += rhs.rows;
bytes += rhs.bytes;
total_rows += rhs.total_rows;
write_rows += rhs.write_rows;
write_bytes += rhs.write_bytes;
read_rows += rhs.read_rows;
read_bytes += rhs.read_bytes;
total_rows_to_read += rhs.total_rows_to_read;
written_rows += rhs.written_rows;
written_bytes += rhs.written_bytes;
return rhs.rows || rhs.write_rows ? true : false;
return rhs.read_rows || rhs.written_rows ? true : false;
}
void reset()
{
rows = 0;
bytes = 0;
total_rows = 0;
write_rows = 0;
write_bytes = 0;
read_rows = 0;
read_bytes = 0;
total_rows_to_read = 0;
written_rows = 0;
written_bytes = 0;
}
ProgressValues getValues() const
{
ProgressValues res;
res.rows = rows.load(std::memory_order_relaxed);
res.bytes = bytes.load(std::memory_order_relaxed);
res.total_rows = total_rows.load(std::memory_order_relaxed);
res.write_rows = write_rows.load(std::memory_order_relaxed);
res.write_bytes = write_bytes.load(std::memory_order_relaxed);
res.read_rows = read_rows.load(std::memory_order_relaxed);
res.read_bytes = read_bytes.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
res.written_rows = written_rows.load(std::memory_order_relaxed);
res.written_bytes = written_bytes.load(std::memory_order_relaxed);
return res;
}
......@@ -115,22 +115,22 @@ struct Progress
{
ProgressValues res;
res.rows = rows.fetch_and(0);
res.bytes = bytes.fetch_and(0);
res.total_rows = total_rows.fetch_and(0);
res.write_rows = write_rows.fetch_and(0);
res.write_bytes = write_bytes.fetch_and(0);
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
return res;
}
Progress & operator=(Progress && other)
{
rows = other.rows.load(std::memory_order_relaxed);
bytes = other.bytes.load(std::memory_order_relaxed);
total_rows = other.total_rows.load(std::memory_order_relaxed);
write_rows = other.write_rows.load(std::memory_order_relaxed);
write_bytes = other.write_bytes.load(std::memory_order_relaxed);
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
return *this;
}
......
......@@ -400,11 +400,13 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.client_info = client_info;
res.elapsed_seconds = watch.elapsedSeconds();
res.is_cancelled = is_killed.load(std::memory_order_relaxed);
res.read_rows = progress_in.rows;
res.read_bytes = progress_in.bytes;
res.total_rows = progress_in.total_rows;
res.written_rows = progress_out.rows;
res.written_bytes = progress_out.bytes;
res.read_rows = progress_in.read_rows;
res.read_bytes = progress_in.read_bytes;
res.total_rows = progress_in.total_rows_to_read;
/// TODO: Use written_rows and written_bytes when real time progress is implemented
res.written_rows = progress_out.read_rows;
res.written_bytes = progress_out.read_bytes;
if (thread_group)
{
......
......@@ -154,10 +154,12 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.query_start_time = query_start_time;
elem.query_duration_ms = (getCurrentTimeNanoseconds() - query_start_time_nanoseconds) / 1000000U;
elem.read_rows = progress_in.rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.bytes.load(std::memory_order_relaxed);
elem.written_rows = progress_out.rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.bytes.load(std::memory_order_relaxed);
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
/// TODO: Use written_rows and written_bytes when run time progress is implemented
elem.written_rows = progress_out.read_rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.read_bytes.load(std::memory_order_relaxed);
elem.memory_usage = memory_tracker.get();
elem.peak_memory_usage = memory_tracker.getPeak();
......
......@@ -336,8 +336,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
{
/// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
elem.result_rows = counting_stream->getProgress().rows;
elem.result_bytes = counting_stream->getProgress().bytes;
elem.result_rows = counting_stream->getProgress().read_rows;
elem.result_bytes = counting_stream->getProgress().read_bytes;
}
}
......
......@@ -494,17 +494,17 @@ public:
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
updateWatch();
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
merge_entry->rows_read += value.rows;
merge_entry->rows_read += value.read_rows;
stage.total_rows += value.total_rows;
stage.rows_read += value.rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
merge_entry->progress.store(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册