提交 2a43daec 编写于 作者: A alesapin

Buildable code

......@@ -105,6 +105,11 @@ public:
return data->getFloat64(0);
}
Float32 getFloat32(size_t) const override
{
return data->getFloat32(0);
}
bool isNullAt(size_t) const override
{
return data->isNullAt(0);
......
......@@ -59,6 +59,7 @@ public:
UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); }
Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); }
Float64 getFloat64(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat64(n)); }
Float32 getFloat32(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat32(n)); }
bool getBool(size_t n) const override { return getDictionary().getInt(getIndexes().getBool(n)); }
bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); }
ColumnPtr cut(size_t start, size_t length) const override
......
......@@ -66,6 +66,7 @@ public:
UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); }
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
......
......@@ -222,6 +222,12 @@ Float64 ColumnVector<T>::getFloat64(size_t n) const
return static_cast<Float64>(data[n]);
}
template <typename T>
Float32 ColumnVector<T>::getFloat32(size_t n) const
{
return static_cast<Float32>(data[n]);
}
template <typename T>
void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
......
......@@ -205,6 +205,7 @@ public:
UInt64 get64(size_t n) const override;
Float64 getFloat64(size_t n) const override;
Float32 getFloat32(size_t n) const override;
UInt64 getUInt(size_t n) const override
{
......
......@@ -100,6 +100,11 @@ public:
throw Exception("Method getFloat64 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual Float32 getFloat32(size_t /*n*/) const
{
throw Exception("Method getFloat32 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** If column is numeric, return value of n-th element, casted to UInt64.
* For NULL values of Nullable column it is allowed to return arbitrary value.
* Otherwise throw an exception.
......
......@@ -464,7 +464,8 @@ namespace ErrorCodes
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY = 487;
extern const int UNKNOWN_DICTIONARY = 488;
extern const int INCORRECT_DICTIONARY_DEFINITION = 489;
extern const int UNACCEPTABLE_URL = 490;
extern const int CANNOT_FORMAT_DATETIME = 490;
extern const int UNACCEPTABLE_URL = 491;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -91,19 +91,7 @@ private:
template <typename T>
static inline void writeNumber2(char * p, T v)
{
static const char digits[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
memcpy(p, &digits[v * 2], 2);
memcpy(p, &digits100[v * 2], 2);
}
template <typename T>
......
......@@ -7,7 +7,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <ext/range.h>
#include <math.h>
#include <cmath>
#include <array>
......@@ -21,19 +21,32 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/** https://en.wikipedia.org/wiki/Great-circle_distance
*
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance .
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180].
* Original code of this implementation of this function is here https://github.com/sphinxsearch/sphinx/blob/409f2c2b5b2ff70b04e38f92b6b1a890326bad65/src/sphinxexpr.cpp#L3825.
* Andrey Aksenov, the author of original code, permitted to use this code in ClickHouse under the Apache 2.0 license.
* Presentation about this code from Highload++ Siberia 2019 is here https://github.com/ClickHouse/ClickHouse/files/3324740/1_._._GEODIST_._.pdf
* The main idea of this implementation is optimisations based on Taylor series, trigonometric identity and calculated constants once for cosine, arcsine(sqrt) and look up table.
*/
namespace
{
const double PI = 3.14159265358979323846;
const float TO_RADF = static_cast<float>(PI / 180.0);
const float TO_RADF2 = static_cast<float>(PI / 360.0);
const int GEODIST_TABLE_COS = 1024; // maxerr 0.00063%
const int GEODIST_TABLE_ASIN = 512;
const int GEODIST_TABLE_K = 1024;
constexpr double PI = 3.14159265358979323846;
constexpr float TO_RADF = static_cast<float>(PI / 180.0);
constexpr float TO_RADF2 = static_cast<float>(PI / 360.0);
constexpr size_t GEODIST_TABLE_COS = 1024; // maxerr 0.00063%
constexpr size_t GEODIST_TABLE_ASIN = 512;
constexpr size_t GEODIST_TABLE_K = 1024;
float g_GeoCos[GEODIST_TABLE_COS + 1]; /// cos(x) table
float g_GeoAsin[GEODIST_TABLE_ASIN + 1]; /// asin(sqrt(x)) table
float g_GeoFlatK[GEODIST_TABLE_K + 1][2]; /// geodistAdaptive() flat ellipsoid method k1,k2 coeffs table
float g_GeoFlatK[GEODIST_TABLE_K + 1][2]; /// geodistAdaptive() flat ellipsoid method k1, k2 coeffs table
inline double sqr(double v)
{
......@@ -48,7 +61,7 @@ inline float fsqr(float v)
void geodistInit()
{
for (size_t i = 0; i <= GEODIST_TABLE_COS; ++i)
g_GeoCos[i] = static_cast<float>(cos(2 * PI * i / GEODIST_TABLE_COS)); // [0, 2pi] -> [0, COSTABLE]
g_GeoCos[i] = static_cast<float>(cos(2 * PI * i / GEODIST_TABLE_COS)); // [0, 2 * pi] -> [0, COSTABLE]
for (size_t i = 0; i <= GEODIST_TABLE_ASIN; ++i)
g_GeoAsin[i] = static_cast<float>(asin(
......@@ -56,7 +69,7 @@ void geodistInit()
for (size_t i = 0; i <= GEODIST_TABLE_K; ++i)
{
double x = PI * i / GEODIST_TABLE_K - PI * 0.5; // [-pi/2, pi/2] -> [0, KTABLE]
double x = PI * i / GEODIST_TABLE_K - PI * 0.5; // [-pi / 2, pi / 2] -> [0, KTABLE]
g_GeoFlatK[i][0] = static_cast<float>(sqr(111132.09 - 566.05 * cos(2 * x) + 1.20 * cos(4 * x)));
g_GeoFlatK[i][1] = static_cast<float>(sqr(111415.13 * cos(x) - 94.55 * cos(3 * x) + 0.12 * cos(5 * x)));
}
......@@ -86,11 +99,10 @@ inline float geodistFastSin(float x)
float y = static_cast<float>(fabs(x) * GEODIST_TABLE_COS / PI / 2);
int i = static_cast<int>(y);
y -= i;
i = (i - GEODIST_TABLE_COS / 4) & (GEODIST_TABLE_COS - 1); // cos(x-pi/2)=sin(x), costable/4=pi/2
i = (i - GEODIST_TABLE_COS / 4) & (GEODIST_TABLE_COS - 1); // cos(x - pi / 2) = sin(x), costable / 4 = pi / 2
return g_GeoCos[i] + (g_GeoCos[i + 1] - g_GeoCos[i]) * y;
}
/// fast implementation of asin(sqrt(x))
/// max error in floats 0.00369%, in doubles 0.00072%
inline float geodistFastAsinSqrt(float x)
......@@ -110,17 +122,10 @@ inline float geodistFastAsinSqrt(float x)
}
return static_cast<float>(asin(sqrt(x))); // distance over 17083km, just compute honestly
}
}
/**
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance .
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180].
* Original code of this implementation of this function is here https://github.com/sphinxsearch/sphinx/blob/409f2c2b5b2ff70b04e38f92b6b1a890326bad65/src/sphinxexpr.cpp#L3825.
* Andrey Aksenov, the author of original code, permitted to use this code in ClickHouse under the Apache 2.0 license.
* Presentation about this code from Highload++ Siberia 2019 is here https://github.com/ClickHouse/ClickHouse/files/3324740/1_._._GEODIST_._.pdf
* The main idea of this implementation is optimisations based on Taylor series, trigonometric identity and calculated constants once for cosine, arcsine(sqrt) and look up table.
*/
class FunctionGreatCircleDistance : public IFunction
{
public:
......@@ -128,133 +133,77 @@ public:
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); }
private:
enum class instr_type : uint8_t
{
get_float_64,
get_const_float_64
};
using instr_t = std::pair<instr_type, const IColumn *>;
using instrs_t = std::array<instr_t, 4>;
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto arg = arguments[arg_idx].get();
if (!WhichDataType(arg).isFloat64())
if (!WhichDataType(arg).isFloat())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return std::make_shared<DataTypeFloat64>();
return std::make_shared<DataTypeFloat32>();
}
instrs_t getInstructions(const Block & block, const ColumnNumbers & arguments, bool & out_const)
Float32 greatCircleDistance(Float32 lon1deg, Float32 lat1deg, Float32 lon2deg, Float32 lat2deg)
{
instrs_t result;
out_const = true;
for (const auto arg_idx : ext::range(0, arguments.size()))
{
const auto column = block.getByPosition(arguments[arg_idx]).column.get();
if (const auto col = checkAndGetColumn<ColumnVector<Float64>>(column))
{
out_const = false;
result[arg_idx] = instr_t{instr_type::get_float_64, col};
}
else if (const auto col_const = checkAndGetColumnConst<ColumnVector<Float64>>(column))
{
result[arg_idx] = instr_t{instr_type::get_const_float_64, col_const};
}
else
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return result;
}
/// https://en.wikipedia.org/wiki/Great-circle_distance
Float64 greatCircleDistance(Float64 lon1Deg, Float64 lat1Deg, Float64 lon2Deg, Float64 lat2Deg)
{
if (lon1Deg < -180 || lon1Deg > 180 ||
lon2Deg < -180 || lon2Deg > 180 ||
lat1Deg < -90 || lat1Deg > 90 ||
lat2Deg < -90 || lat2Deg > 90)
if (lon1deg < -180 || lon1deg > 180 ||
lon2deg < -180 || lon2deg > 180 ||
lat1deg < -90 || lat1deg > 90 ||
lat2deg < -90 || lat2deg > 90)
{
throw Exception("Arguments values out of bounds for function " + getName(),
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
float dlat = geodistDegDiff(lat1Deg - lat2Deg);
float dlon = geodistDegDiff(lon1Deg - lon2Deg);
float lat_diff = geodistDegDiff(lat1deg - lat2deg);
float lon_diff = geodistDegDiff(lon1deg - lon2deg);
if (dlon < 13)
if (lon_diff < 13)
{
// points are close enough; use flat ellipsoid model
// interpolate sqr(k1), sqr(k2) coefficients using latitudes midpoint
float m = (lat1Deg + lat2Deg + 180) * GEODIST_TABLE_K / 360; // [-90, 90] degrees -> [0, KTABLE] indexes
int i = static_cast<int>(m);
i &= (GEODIST_TABLE_K - 1);
float m = (lat1deg + lat2deg + 180) * GEODIST_TABLE_K / 360; // [-90, 90] degrees -> [0, KTABLE] indexes
size_t i = static_cast<size_t>(m) & (GEODIST_TABLE_K - 1);
float kk1 = g_GeoFlatK[i][0] + (g_GeoFlatK[i + 1][0] - g_GeoFlatK[i][0]) * (m - i);
float kk2 = g_GeoFlatK[i][1] + (g_GeoFlatK[i + 1][1] - g_GeoFlatK[i][1]) * (m - i);
return static_cast<float>(sqrt(kk1 * dlat * dlat + kk2 * dlon * dlon));
return static_cast<float>(sqrt(kk1 * lat_diff * lat_diff + kk2 * lon_diff * lon_diff));
}
else
{
// points too far away; use haversine
static const float d = 2 * 6371000;
float a = fsqr(geodistFastSin(lat_diff * TO_RADF2)) +
geodistFastCos(lat1deg * TO_RADF) * geodistFastCos(lat2deg * TO_RADF) *
fsqr(geodistFastSin(lon_diff * TO_RADF2));
return static_cast<float>(d * geodistFastAsinSqrt(a));
}
// points too far away; use haversine
static const float D = 2 * 6371000;
float a = fsqr(geodistFastSin(dlat * TO_RADF2)) +
geodistFastCos(lat1Deg * TO_RADF) * geodistFastCos(lat2Deg * TO_RADF) *
fsqr(geodistFastSin(dlon * TO_RADF2));
return static_cast<float>(D * geodistFastAsinSqrt(a));
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const auto size = input_rows_count;
auto dst = ColumnVector<Float32>::create();
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
bool result_is_const{};
auto instrs = getInstructions(block, arguments, result_is_const);
const IColumn & col_lon1 = *block.getByPosition(arguments[0]).column;
const IColumn & col_lat1 = *block.getByPosition(arguments[1]).column;
const IColumn & col_lon2 = *block.getByPosition(arguments[2]).column;
const IColumn & col_lat2 = *block.getByPosition(arguments[3]).column;
if (result_is_const)
{
const auto & colLon1 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[0]).column.get())->getValue<Float64>();
const auto & colLat1 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[1]).column.get())->getValue<Float64>();
const auto & colLon2 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[2]).column.get())->getValue<Float64>();
const auto & colLat2 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[3]).column.get())->getValue<Float64>();
for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
dst_data[row_num] = greatCircleDistance(
col_lon1.getFloat32(row_num), col_lat1.getFloat32(row_num),
col_lon2.getFloat32(row_num), col_lat2.getFloat32(row_num));
Float64 res = greatCircleDistance(colLon1, colLat1, colLon2, colLat2);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, res);
}
else
{
auto dst = ColumnVector<Float64>::create();
auto & dst_data = dst->getData();
dst_data.resize(size);
Float64 vals[instrs.size()];
for (const auto row : ext::range(0, size))
{
for (const auto idx : ext::range(0, instrs.size()))
{
if (instr_type::get_float_64 == instrs[idx].first)
vals[idx] = assert_cast<const ColumnVector<Float64> *>(instrs[idx].second)->getData()[row];
else if (instr_type::get_const_float_64 == instrs[idx].first)
vals[idx] = assert_cast<const ColumnConst *>(instrs[idx].second)->getValue<Float64>();
else
throw Exception{"Unknown instruction type in implementation of greatCircleDistance function", ErrorCodes::LOGICAL_ERROR};
}
dst_data[row] = greatCircleDistance(vals[0], vals[1], vals[2], vals[3]);
}
block.getByPosition(result).column = std::move(dst);
}
block.getByPosition(result).column = std::move(dst);
}
};
......
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/S3Common.h>
#include <common/logger_useful.h>
......@@ -10,14 +11,13 @@ namespace DB
const int DEFAULT_S3_MAX_FOLLOW_GET_REDIRECT = 2;
ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_,
ReadBufferFromS3::ReadBufferFromS3(const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_,
const RemoteHostFilter & remote_host_filter_)
: ReadBuffer(nullptr, 0)
, uri {uri_}
, method {Poco::Net::HTTPRequest::HTTP_GET}
, session {makeHTTPSession(uri_, timeouts)}
, remote_host_filter {remote_host_filter_}
{
......@@ -30,11 +30,13 @@ ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_,
if (uri.getPath().empty())
uri.setPath("/");
request = std::make_unique<Poco::Net::HTTPRequest>(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request = std::make_unique<Poco::Net::HTTPRequest>(
Poco::Net::HTTPRequest::HTTP_GET,
uri.getPathAndQuery(),
Poco::Net::HTTPRequest::HTTP_1_1);
request->setHost(uri.getHost()); // use original, not resolved host name in header
if (!credentials.getUsername().empty())
credentials.authenticate(*request);
S3Helper::authenticateRequest(*request, access_key_id_, secret_access_key_);
LOG_TRACE((&Logger::get("ReadBufferFromS3")), "Sending request to " << uri.toString());
......@@ -57,7 +59,7 @@ ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_,
}
assertResponseIsOk(*request, response, *istr);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
impl = std::make_unique<ReadBufferFromIStream>(*istr, DBMS_DEFAULT_BUFFER_SIZE);
}
......
......@@ -17,8 +17,6 @@ class ReadBufferFromS3 : public ReadBuffer
{
protected:
Poco::URI uri;
std::string method;
HTTPSessionPtr session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
......@@ -26,11 +24,11 @@ protected:
RemoteHostFilter remote_host_filter;
public:
explicit ReadBufferFromS3(Poco::URI uri_,
explicit ReadBufferFromS3(const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const RemoteHostFilter & remote_host_filter_ = {});
const RemoteHostFilter & remote_host_filter_ = {});
bool nextImpl() override;
};
......
......@@ -916,12 +916,12 @@ void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getReadBuffer(const DB::CompressionMethod method, Types&&... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(std::forward<Types>(args)...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}
/** This function just copies the data from buffer's internal position (in.position())
......
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <iterator>
#include <sstream>
#include <Poco/Base64Encoder.h>
#include <Poco/HMACEngine.h>
#include <Poco/SHA1Engine.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_FORMAT_DATETIME;
}
void S3Helper::authenticateRequest(
Poco::Net::HTTPRequest & request,
const String & access_key_id,
const String & secret_access_key)
{
/// See https://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
if (access_key_id.empty())
return;
/// Limitations:
/// 1. Virtual hosted-style requests are not supported (e.g. `http://johnsmith.net.s3.amazonaws.com/homepage.html`).
/// 2. AMZ headers are not supported (TODO).
if (!request.has("Date"))
{
WriteBufferFromOwnString out;
writeDateTimeTextRFC1123(time(nullptr), out, DateLUT::instance("UTC"));
request.set("Date", out.str());
}
String string_to_sign = request.getMethod() + "\n"
+ request.get("Content-MD5", "") + "\n"
+ request.get("Content-Type", "") + "\n"
+ request.get("Date") + "\n"
+ Poco::URI(request.getURI()).getPathAndQuery();
Poco::HMACEngine<Poco::SHA1Engine> engine(secret_access_key);
engine.update(string_to_sign);
auto digest = engine.digest();
std::ostringstream signature;
Poco::Base64Encoder encoder(signature);
std::copy(digest.begin(), digest.end(), std::ostream_iterator<char>(encoder));
encoder.close();
request.set("Authorization", "AWS " + access_key_id + ":" + signature.str());
}
}
#pragma once
#include <Core/Types.h>
#include <Poco/Net/HTTPRequest.h>
namespace DB
{
namespace S3Helper
{
void authenticateRequest(
Poco::Net::HTTPRequest & request,
const String & access_key_id,
const String & secret_access_key);
};
}
#include <IO/WriteBufferFromS3.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
#include <Poco/DOM/AutoPtr.h>
......@@ -30,24 +31,24 @@ namespace ErrorCodes
WriteBufferFromS3::WriteBufferFromS3(
const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
size_t minimum_upload_part_size_,
const ConnectionTimeouts & timeouts_,
const Poco::Net::HTTPBasicCredentials & credentials, size_t buffer_size_,
const RemoteHostFilter & remote_host_filter_
)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
const RemoteHostFilter & remote_host_filter_)
: BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, uri {uri_}
, access_key_id {access_key_id_}
, secret_access_key {secret_access_key_}
, minimum_upload_part_size {minimum_upload_part_size_}
, timeouts {timeouts_}
, auth_request {Poco::Net::HTTPRequest::HTTP_PUT, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1}
, temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
, last_part_size {0}
, remote_host_filter(remote_host_filter_)
{
if (!credentials.getUsername().empty())
credentials.authenticate(auth_request);
initiate();
/// FIXME: Implement rest of S3 authorization.
}
......@@ -115,11 +116,7 @@ void WriteBufferFromS3::initiate()
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, initiate_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(initiate_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials())
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
request_ptr->setContentLength(0);
......@@ -182,11 +179,7 @@ void WriteBufferFromS3::writePart(const String & data)
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_PUT, part_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(part_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials())
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
request_ptr->setExpectContinue(true);
......@@ -255,11 +248,7 @@ void WriteBufferFromS3::complete()
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, complete_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(complete_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials())
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
request_ptr->setExpectContinue(true);
......
......@@ -21,9 +21,10 @@ class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
{
private:
Poco::URI uri;
String access_key_id;
String secret_access_key;
size_t minimum_upload_part_size;
ConnectionTimeouts timeouts;
Poco::Net::HTTPRequest auth_request;
String buffer_string;
std::unique_ptr<WriteBufferFromString> temporary_buffer;
size_t last_part_size;
......@@ -36,10 +37,10 @@ private:
public:
explicit WriteBufferFromS3(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
size_t minimum_upload_part_size_,
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const RemoteHostFilter & remote_host_filter_ = {});
void nextImpl() override;
......
......@@ -568,45 +568,46 @@ inline void writeUUIDText(const UUID & uuid, WriteBuffer & buf)
buf.write(s, sizeof(s));
}
static const char digits100[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
/// in YYYY-MM-DD format
template <char delimiter = '-'>
inline void writeDateText(const LocalDate & date, WriteBuffer & buf)
{
static const char digits[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
if (buf.position() + 10 <= buf.buffer().end())
{
memcpy(buf.position(), &digits[date.year() / 100 * 2], 2);
memcpy(buf.position(), &digits100[date.year() / 100 * 2], 2);
buf.position() += 2;
memcpy(buf.position(), &digits[date.year() % 100 * 2], 2);
memcpy(buf.position(), &digits100[date.year() % 100 * 2], 2);
buf.position() += 2;
*buf.position() = delimiter;
++buf.position();
memcpy(buf.position(), &digits[date.month() * 2], 2);
memcpy(buf.position(), &digits100[date.month() * 2], 2);
buf.position() += 2;
*buf.position() = delimiter;
++buf.position();
memcpy(buf.position(), &digits[date.day() * 2], 2);
memcpy(buf.position(), &digits100[date.day() * 2], 2);
buf.position() += 2;
}
else
{
buf.write(&digits[date.year() / 100 * 2], 2);
buf.write(&digits[date.year() % 100 * 2], 2);
buf.write(&digits100[date.year() / 100 * 2], 2);
buf.write(&digits100[date.year() % 100 * 2], 2);
buf.write(delimiter);
buf.write(&digits[date.month() * 2], 2);
buf.write(&digits100[date.month() * 2], 2);
buf.write(delimiter);
buf.write(&digits[date.day() * 2], 2);
buf.write(&digits100[date.day() * 2], 2);
}
}
......@@ -628,59 +629,47 @@ inline void writeDateText(DayNum date, WriteBuffer & buf)
template <char date_delimeter = '-', char time_delimeter = ':', char between_date_time_delimiter = ' '>
inline void writeDateTimeText(const LocalDateTime & datetime, WriteBuffer & buf)
{
static const char digits[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
if (buf.position() + 19 <= buf.buffer().end())
{
memcpy(buf.position(), &digits[datetime.year() / 100 * 2], 2);
memcpy(buf.position(), &digits100[datetime.year() / 100 * 2], 2);
buf.position() += 2;
memcpy(buf.position(), &digits[datetime.year() % 100 * 2], 2);
memcpy(buf.position(), &digits100[datetime.year() % 100 * 2], 2);
buf.position() += 2;
*buf.position() = date_delimeter;
++buf.position();
memcpy(buf.position(), &digits[datetime.month() * 2], 2);
memcpy(buf.position(), &digits100[datetime.month() * 2], 2);
buf.position() += 2;
*buf.position() = date_delimeter;
++buf.position();
memcpy(buf.position(), &digits[datetime.day() * 2], 2);
memcpy(buf.position(), &digits100[datetime.day() * 2], 2);
buf.position() += 2;
*buf.position() = between_date_time_delimiter;
++buf.position();
memcpy(buf.position(), &digits[datetime.hour() * 2], 2);
memcpy(buf.position(), &digits100[datetime.hour() * 2], 2);
buf.position() += 2;
*buf.position() = time_delimeter;
++buf.position();
memcpy(buf.position(), &digits[datetime.minute() * 2], 2);
memcpy(buf.position(), &digits100[datetime.minute() * 2], 2);
buf.position() += 2;
*buf.position() = time_delimeter;
++buf.position();
memcpy(buf.position(), &digits[datetime.second() * 2], 2);
memcpy(buf.position(), &digits100[datetime.second() * 2], 2);
buf.position() += 2;
}
else
{
buf.write(&digits[datetime.year() / 100 * 2], 2);
buf.write(&digits[datetime.year() % 100 * 2], 2);
buf.write(&digits100[datetime.year() / 100 * 2], 2);
buf.write(&digits100[datetime.year() % 100 * 2], 2);
buf.write(date_delimeter);
buf.write(&digits[datetime.month() * 2], 2);
buf.write(&digits100[datetime.month() * 2], 2);
buf.write(date_delimeter);
buf.write(&digits[datetime.day() * 2], 2);
buf.write(&digits100[datetime.day() * 2], 2);
buf.write(between_date_time_delimiter);
buf.write(&digits[datetime.hour() * 2], 2);
buf.write(&digits100[datetime.hour() * 2], 2);
buf.write(time_delimeter);
buf.write(&digits[datetime.minute() * 2], 2);
buf.write(&digits100[datetime.minute() * 2], 2);
buf.write(time_delimeter);
buf.write(&digits[datetime.second() * 2], 2);
buf.write(&digits100[datetime.second() * 2], 2);
}
}
......@@ -707,6 +696,33 @@ inline void writeDateTimeText(time_t datetime, WriteBuffer & buf, const DateLUTI
}
/// In the RFC 1123 format: "Tue, 03 Dec 2019 00:11:50 GMT". You must provide GMT DateLUT.
/// This is needed for HTTP requests.
inline void writeDateTimeTextRFC1123(time_t datetime, WriteBuffer & buf, const DateLUTImpl & date_lut)
{
const auto & values = date_lut.getValues(datetime);
static const char week_days[3 * 8 + 1] = "XXX" "Mon" "Tue" "Wed" "Thu" "Fri" "Sat" "Sun";
static const char months[3 * 13 + 1] = "XXX" "Jan" "Feb" "Mar" "Apr" "May" "Jun" "Jul" "Aug" "Sep" "Oct" "Nov" "Dec";
buf.write(&week_days[values.day_of_week * 3], 3);
buf.write(", ", 2);
buf.write(&digits100[values.day_of_month * 2], 2);
buf.write(' ');
buf.write(&months[values.month * 3], 3);
buf.write(' ');
buf.write(&digits100[values.year / 100 * 2], 2);
buf.write(&digits100[values.year % 100 * 2], 2);
buf.write(' ');
buf.write(&digits100[date_lut.toHour(datetime) * 2], 2);
buf.write(':');
buf.write(&digits100[date_lut.toMinute(datetime) * 2], 2);
buf.write(':');
buf.write(&digits100[date_lut.toSecond(datetime) * 2], 2);
buf.write(" GMT", 4);
}
/// Methods for output in binary format.
template <typename T>
inline std::enable_if_t<is_arithmetic_v<T>, void>
......
#include <gtest/gtest.h>
#include <common/DateLUT.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
TEST(RFC1123, Test)
{
using namespace DB;
WriteBufferFromOwnString out;
writeDateTimeTextRFC1123(1111111111, out, DateLUT::instance("UTC"));
ASSERT_EQ(out.str(), "Fri, 18 Mar 2005 01:58:31 GMT");
}
......@@ -167,7 +167,7 @@ private:
size_t canMoveEqualsToJoinOn(const ASTFunction & node)
{
if (!node.arguments)
throw Exception("Logical error: function requires argiment", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: function requires arguments", ErrorCodes::LOGICAL_ERROR);
if (node.arguments->children.size() != 2)
return false;
......
......@@ -27,7 +27,7 @@ struct ExternalLoaderConfigSettings
};
/** Iterface for manage user-defined objects.
/** Interface for manage user-defined objects.
* Monitors configuration file and automatically reloads objects in separate threads.
* The monitoring thread wakes up every 'check_period_sec' seconds and checks
* modification time of objects' configuration file. If said time is greater than
......
......@@ -28,7 +28,7 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
if (it != list_of_selects->children.begin())
settings.ostr
<< settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_none : "")
<< settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame);
......
......@@ -32,6 +32,8 @@ namespace
{
public:
StorageS3BlockInputStream(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const String & name_,
const Block & sample_block,
......@@ -41,7 +43,7 @@ namespace
const CompressionMethod compression_method)
: name(name_)
{
read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, uri, timeouts, context.getRemoteHostFilter());
read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, uri, access_key_id, secret_access_key, timeouts, context.getRemoteHostFilter());
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......@@ -80,6 +82,8 @@ namespace
{
public:
StorageS3BlockOutputStream(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & format,
UInt64 min_upload_part_size,
const Block & sample_block_,
......@@ -88,7 +92,14 @@ namespace
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getWriteBuffer<WriteBufferFromS3>(compression_method, uri, min_upload_part_size, timeouts, context.getRemoteHostFilter());
write_buf = getWriteBuffer<WriteBufferFromS3>(
compression_method,
uri,
access_key_id,
secret_access_key,
min_upload_part_size,
timeouts,
context.getRemoteHostFilter());
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......@@ -124,6 +135,8 @@ namespace
StorageS3::StorageS3(
const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
......@@ -134,6 +147,8 @@ StorageS3::StorageS3(
const String & compression_method_ = "")
: IStorage(columns_)
, uri(uri_)
, access_key_id(access_key_id_)
, secret_access_key(secret_access_key_)
, context_global(context_)
, format_name(format_name_)
, database_name(database_name_)
......@@ -157,6 +172,8 @@ BlockInputStreams StorageS3::read(
{
BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>(
uri,
access_key_id,
secret_access_key,
format_name,
getName(),
getHeaderBlock(column_names),
......@@ -180,7 +197,13 @@ void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_dat
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
uri, format_name, min_upload_part_size, getSampleBlock(), context_global,
uri,
access_key_id,
secret_access_key,
format_name,
min_upload_part_size,
getSampleBlock(),
context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global),
IStorage::chooseCompressionMethod(uri.toString(), compression_method));
}
......@@ -191,29 +214,35 @@ void registerStorageS3(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2 && engine_args.size() != 3)
if (engine_args.size() < 2 || engine_args.size() > 5)
throw Exception(
"Storage S3 requires 2 or 3 arguments: url, name of used format and compression_method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
for (size_t i = 0; i < engine_args.size(); ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri(url);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
String format_name = engine_args[engine_args.size() - 1]->as<ASTLiteral &>().value.safeGet<String>();
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String access_key_id;
String secret_access_key;
if (engine_args.size() >= 4)
{
access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
String compression_method;
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
if (engine_args.size() == 3 || engine_args.size() == 5)
compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
else
compression_method = "auto";
return StorageS3::create(uri, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
return StorageS3::create(uri, access_key_id, secret_access_key, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
});
}
}
......@@ -18,8 +18,10 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
public:
StorageS3(
const Poco::URI & uri_,
const std::string & database_name_,
const std::string & table_name_,
const String & access_key_id,
const String & secret_access_key,
const String & database_name_,
const String & table_name_,
const String & format_name_,
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
......@@ -56,6 +58,8 @@ public:
private:
Poco::URI uri;
String access_key_id;
String secret_access_key;
const Context & context_global;
String format_name;
......
......@@ -60,7 +60,18 @@ namespace
const CompressionMethod compression_method)
: name(name_)
{
read_buf = getReadBuffer<ReadWriteBufferFromHTTP>(compression_method, uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects, context.getRemoteHostFilter());
read_buf = getReadBuffer<ReadWriteBufferFromHTTP>(
compression_method,
uri,
method,
callback,
timeouts,
context.getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
context.getRemoteHostFilter());
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......
#include <Storages/StorageS3.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
/// Parse args
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.at(0)->children;
if (args.size() < 3 || args.size() > 6)
throw Exception("Table function '" + getName() + "' requires 3 to 6 arguments: url, [access_key_id, secret_access_key,] format, structure and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < args.size(); ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
String format;
String structure;
String access_key_id;
String secret_access_key;
if (args.size() < 5)
{
format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
access_key_id = args[1]->as<ASTLiteral &>().value.safeGet<String>();
secret_access_key = args[2]->as<ASTLiteral &>().value.safeGet<String>();
format = args[3]->as<ASTLiteral &>().value.safeGet<String>();
structure = args[4]->as<ASTLiteral &>().value.safeGet<String>();
}
String compression_method;
if (args.size() == 4 || args.size() == 6)
compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
else
compression_method = "auto";
ColumnsDescription columns = parseColumnsListFromString(structure, context);
/// Create table
StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
return storage;
}
StoragePtr TableFunctionS3::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name,
const String & compression_method) const
{
Poco::URI uri(source);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method);
return StorageS3::create(uri, access_key_id, secret_access_key, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method);
}
void registerTableFunctionS3(TableFunctionFactory & factory)
......
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/ITableFunction.h>
namespace DB
......@@ -8,9 +8,9 @@ namespace DB
class Context;
/* s3(source, format, structure) - creates a temporary storage for a file in S3
/* s3(source, [access_key_id, secret_access_key,] format, structure) - creates a temporary storage for a file in S3
*/
class TableFunctionS3 : public ITableFunctionFileLike
class TableFunctionS3 : public ITableFunction
{
public:
static constexpr auto name = "s3";
......@@ -20,13 +20,20 @@ public:
}
private:
StoragePtr executeImpl(
const ASTPtr & ast_function,
const Context & context,
const std::string & table_name) const override;
StoragePtr getStorage(
const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name,
const String & compression_method) const override;
const String & compression_method) const;
};
}
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import os.path
......@@ -72,6 +73,8 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stdout_file))
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stderr_file))
......@@ -81,7 +84,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
stderr = open(stderr_file, 'r').read() if os.path.exists(stderr_file) else ''
stderr = unicode(stderr, errors='replace', encoding='utf-8')
return proc, stdout, stderr
return proc, stdout, stderr, total_time
def need_retry(stderr):
......@@ -149,6 +152,10 @@ def run_tests_array(all_tests_with_params):
client_options = get_additional_client_options(args)
def print_test_time(test_time):
if args.print_time:
print(" {0:.2f} sec.".format(test_time), end='')
if len(all_tests):
print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n")
......@@ -194,7 +201,7 @@ def run_tests_array(all_tests_with_params):
stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
if proc.returncode is None:
try:
proc.kill()
......@@ -203,11 +210,13 @@ def run_tests_array(all_tests_with_params):
raise
failures += 1
print("{0} - Timeout!".format(MSG_FAIL))
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - Timeout!")
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
sleep(2**counter)
counter += 1
if counter > 6:
......@@ -216,7 +225,9 @@ def run_tests_array(all_tests_with_params):
if proc.returncode != 0:
failures += 1
failures_chain += 1
print("{0} - return code {1}".format(MSG_FAIL, proc.returncode))
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - return code {}".format(proc.returncode))
if stderr:
print(stderr.encode('utf-8'))
......@@ -227,24 +238,34 @@ def run_tests_array(all_tests_with_params):
elif stderr:
failures += 1
failures_chain += 1
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8')))
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - having stderror:\n{}".format(stderr.encode('utf-8')))
elif 'Exception' in stdout:
failures += 1
failures_chain += 1
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8')))
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - having exception:\n{}".format(stdout.encode('utf-8')))
elif not os.path.isfile(reference_file):
print("{0} - no reference file".format(MSG_UNKNOWN))
print(MSG_UNKNOWN, end='')
print_test_time(total_time)
print(" - no reference file")
else:
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout = PIPE)
if result_is_different:
diff = Popen(['diff', '-U', str(args.unified), reference_file, stdout_file], stdout = PIPE).communicate()[0]
failures += 1
print("{0} - result differs with reference:\n{1}".format(MSG_FAIL, diff))
print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - result differs with reference:\n{}".format(diff))
else:
passed_total += 1
failures_chain = 0
print(MSG_OK)
print(MSG_OK, end='')
print_test_time(total_time)
print()
if os.path.exists(stdout_file):
os.remove(stdout_file)
if os.path.exists(stderr_file):
......@@ -503,6 +524,7 @@ if __name__ == '__main__':
parser.add_argument('--skip', nargs='+', help="Skip these tests")
parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests')
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')
......
......@@ -5,6 +5,9 @@ import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.client
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
......@@ -53,12 +56,18 @@ def prepare_s3_bucket(cluster):
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
minio_client.remove_bucket(cluster.minio_restricted_bucket)
minio_client.make_bucket(cluster.minio_restricted_bucket)
# Returns content of given S3 file as string.
def get_s3_file_content(cluster, filename):
def get_s3_file_content(cluster, bucket, filename):
# type: (ClickHouseCluster, str) -> str
data = cluster.minio_client.get_object(cluster.minio_bucket, filename)
data = cluster.minio_client.get_object(bucket, filename)
data_str = ""
for chunk in data.stream():
data_str += chunk
......@@ -101,53 +110,76 @@ def run_query(instance, query, stdin=None, settings=None):
# Test simple put.
def test_put(cluster):
@pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_put(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
cluster.minio_host, cluster.minio_port, cluster.minio_bucket, filename, table_format, values)
run_query(instance, put_query)
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') values {}".format(
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format, values)
assert values_csv == get_s3_file_content(cluster, filename)
try:
run_query(instance, put_query)
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
assert values_csv == get_s3_file_content(cluster, bucket, filename)
# Test put values in CSV format.
def test_put_csv(cluster):
@pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_put_csv(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') format CSV".format(
cluster.minio_host, cluster.minio_port, cluster.minio_bucket, filename, table_format)
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format)
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
run_query(instance, put_query, stdin=csv_data)
assert csv_data == get_s3_file_content(cluster, filename)
try:
run_query(instance, put_query, stdin=csv_data)
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
assert csv_data == get_s3_file_content(cluster, bucket, filename)
# Test put and get with S3 server redirect.
def test_put_get_with_redirect(cluster):
# type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format, values)
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
run_query(instance, query)
assert values_csv == get_s3_file_content(cluster, filename)
assert values_csv == get_s3_file_content(cluster, bucket, filename)
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format)
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format)
stdout = run_query(instance, query)
assert list(map(str.split, stdout.splitlines())) == [
......@@ -158,9 +190,15 @@ def test_put_get_with_redirect(cluster):
# Test multipart put.
def test_multipart_put(cluster):
@pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_multipart_put(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
......@@ -178,18 +216,22 @@ def test_multipart_put(cluster):
assert len(csv_data) > min_part_size_bytes
filename = "test_multipart.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') format CSV".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format)
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
# Use Nginx access logs to count number of parts uploaded to Minio.
nginx_logs = get_nginx_access_logs()
uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs)
assert uploaded_parts > 1
assert csv_data == get_s3_file_content(cluster, filename)
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
try:
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
# Use Nginx access logs to count number of parts uploaded to Minio.
nginx_logs = get_nginx_access_logs()
uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs)
assert uploaded_parts > 1
assert csv_data == get_s3_file_content(cluster, bucket, filename)
def test_remote_host_filter(started_cluster):
instance = started_cluster.instances["dummy"]
......
<test>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>3</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>5</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<!-- 100 AND operands -->
<query>select count() from numbers(10000000) where number != 96594 AND number != 18511 AND number != 98085 AND number != 84177 AND number != 70314 AND number != 28083 AND number != 54202 AND number != 66522 AND number != 66939 AND number != 99469 AND number != 65776 AND number != 22876 AND number != 42151 AND number != 19924 AND number != 66681 AND number != 63022 AND number != 17487 AND number != 83914 AND number != 59754 AND number != 968 AND number != 73334 AND number != 68569 AND number != 49853 AND number != 33155 AND number != 31777 AND number != 99698 AND number != 26708 AND number != 76409 AND number != 42191 AND number != 55397 AND number != 25724 AND number != 39170 AND number != 22728 AND number != 98238 AND number != 86052 AND number != 12756 AND number != 13948 AND number != 57774 AND number != 82511 AND number != 11337 AND number != 23506 AND number != 11875 AND number != 58536 AND number != 56919 AND number != 25986 AND number != 80710 AND number != 61797 AND number != 99244 AND number != 11665 AND number != 15758 AND number != 82899 AND number != 63150 AND number != 7198 AND number != 40071 AND number != 46310 AND number != 78488 AND number != 9273 AND number != 91878 AND number != 57904 AND number != 53941 AND number != 75675 AND number != 12093 AND number != 50090 AND number != 59675 AND number != 41632 AND number != 81448 AND number != 46821 AND number != 51919 AND number != 49028 AND number != 71059 AND number != 15673 AND number != 6132 AND number != 15473 AND number != 32527 AND number != 63842 AND number != 33121 AND number != 53271 AND number != 86033 AND number != 96807 AND number != 4791 AND number != 80089 AND number != 51616 AND number != 46311 AND number != 82844 AND number != 59353 AND number != 63538 AND number != 64857 AND number != 58471 AND number != 29870 AND number != 80209 AND number != 61000 AND number != 75991 AND number != 44506 AND number != 11283 AND number != 6335 AND number != 73502 AND number != 22354 AND number != 72816 AND number != 66399 AND number != 61703</query>
<!-- 10 AND operands -->
<query>select count() from numbers(10000000) where number != 96594 AND number != 18511 AND number != 98085 AND number != 84177 AND number != 70314 AND number != 28083 AND number != 54202 AND number != 66522 AND number != 66939 AND number != 99469</query>
</test>
......@@ -2,7 +2,7 @@ drop table if exists test_table_s3_syntax
;
create table test_table_s3_syntax (id UInt32) ENGINE = S3('')
; -- { serverError 42 }
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','','')
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','','','','')
; -- { serverError 42 }
drop table if exists test_table_s3_syntax
;
......@@ -772,22 +772,6 @@ SELECT arrayReduce('uniqUpTo(3)', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
└─────────────────────────────────────────────────────────────┘
```
## arrayFlatten(arr) {#array_functions-arrayflatten}
The `arrayFlatten` (or `flatten` alias) method will collapse the elements of an array to create a single array.
Example:
```sql
SELECT arrayFlatten([[1, 2, 3], [4, 5]])
```
```text
┌─arrayFlatten([[1, 2, 3], [4, 5]])─┐
│ [1,2,3,4,5] │
└───────────────────────────────────┘
```
## arrayReverse(arr) {#array_functions-arrayreverse}
Returns an array of the same size as the original array containing the elements in reverse order.
......@@ -808,6 +792,44 @@ SELECT arrayReverse([1, 2, 3])
Synonym for ["arrayReverse"](#array_functions-arrayreverse)
## arrayFlatten {#arrayflatten}
Converts array of arrays to a flat array.
Function:
- Applies for any depth of nested arrays, but all the elements should lay at the same level.
For example, the `[[[1]], [[2], [3]]]` array can be flattened, but the `[[1], [[2], [3]]]` array can't be flattened.
- Does not change arrays that are already flat.
The flattened array contains all the elements from all source arrays.
**Syntax**
```sql
flatten(array_of_arrays)
```
Alias: `flatten`.
**Parameters**
- `array_of_arrays`[Array](../../data_types/array.md) of arrays. For example, `[[1,2,3], [4,5]]`.
**Examples**
```sql
SELECT flatten([[[1]], [[2], [3]]])
```
```text
┌─flatten(array(array([1]), array([2], [3])))─┐
│ [1,2,3] │
└─────────────────────────────────────────────┘
```
## arrayCompact {#arraycompact}
Removes consecutive duplicate elements from an array. The order of result values is determined by the order in the source array.
......@@ -844,4 +866,4 @@ Result:
└────────────────────────────────────────────┘
```
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/array_functions/) <!--hide-->
\ No newline at end of file
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/array_functions/) <!--hide-->
......@@ -215,7 +215,8 @@ nav:
- 'Overview of ClickHouse Architecture': 'development/architecture.md'
- 'How to Build ClickHouse on Linux': 'development/build.md'
- 'How to Build ClickHouse on Mac OS X': 'development/build_osx.md'
- 'How to Build ClickHouse on Linux for Mac OS X': 'development/build_cross.md'
- 'How to Build ClickHouse on Linux for Mac OS X': 'development/build_cross_osx.md'
- 'How to Build ClickHouse on Linux for AARCH64 (ARM64)': 'development/build_cross_arm.md'
- 'How to Write C++ Code': 'development/style.md'
- 'How to Run ClickHouse Tests': 'development/tests.md'
- 'The Beginner ClickHouse Developer Instruction': 'development/developer_instruction.md'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册