ExternalTable.cpp 6.4 KB
Newer Older
1
#include <boost/program_options.hpp>
A
Alexey Milovidov 已提交
2
#include <DataStreams/IBlockOutputStream.h>
A
Alexey Milovidov 已提交
3
#include <DataStreams/AsynchronousBlockInputStream.h>
4
#include <DataTypes/DataTypeFactory.h>
A
Alexey Milovidov 已提交
5 6 7
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
8
#include <Interpreters/Context.h>
A
Alexey Milovidov 已提交
9
#include <Interpreters/DatabaseCatalog.h>
10 11 12
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
A
Alexey Milovidov 已提交
13

14 15 16
#include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
A
Alexey Milovidov 已提交
17
#include <Processors/Sources/SourceFromInputStream.h>
A
Alexey Milovidov 已提交
18

19 20
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
A
Alexey Milovidov 已提交
21
#include <Formats/FormatFactory.h>
22
#include <common/find_symbols.h>
23 24 25 26 27 28 29 30 31 32 33


namespace DB
{

namespace ErrorCodes
{
    extern const int BAD_ARGUMENTS;
}


34
ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
35 36 37
{
    initReadBuffer();
    initSampleBlock();
A
Alexey Milovidov 已提交
38 39
    auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
    auto stream = std::make_shared<AsynchronousBlockInputStream>(input);
40 41 42

    auto data = std::make_unique<ExternalTableData>();
    data->table_name = name;
A
Alexey Milovidov 已提交
43
    data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromInputStream>(std::move(stream)));
44 45

    return data;
46 47
}

A
Alexey Milovidov 已提交
48
void BaseExternalTable::clear()
49
{
50 51 52
    name.clear();
    file.clear();
    format.clear();
53
    structure.clear();
54
    sample_block.clear();
55 56 57 58 59
    read_buffer.reset();
}

void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
{
60 61
    std::vector<std::string> vals;
    splitInto<' ', ','>(vals, argument, true);
62

A
Alexey Milovidov 已提交
63 64
    if (vals.size() % 2 != 0)
        throw Exception("Odd number of attributes in section structure: " + std::to_string(vals.size()), ErrorCodes::BAD_ARGUMENTS);
65 66 67 68 69 70 71

    for (size_t i = 0; i < vals.size(); i += 2)
        structure.emplace_back(vals[i], vals[i + 1]);
}

void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
{
72 73
    std::vector<std::string> vals;
    splitInto<' ', ','>(vals, argument, true);
74 75 76 77 78 79 80 81 82

    for (size_t i = 0; i < vals.size(); ++i)
        structure.emplace_back("_" + toString(i + 1), vals[i]);
}

void BaseExternalTable::initSampleBlock()
{
    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

A
Alexey Milovidov 已提交
83
    for (const auto & elem : structure)
84 85
    {
        ColumnWithTypeAndName column;
A
Alexey Milovidov 已提交
86 87
        column.name = elem.first;
        column.type = data_type_factory.get(elem.second);
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
        column.column = column.type->createColumn();
        sample_block.insert(std::move(column));
    }
}


void ExternalTable::initReadBuffer()
{
    if (file == "-")
        read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
    else
        read_buffer = std::make_unique<ReadBufferFromFile>(file);
}

ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
{
    if (external_options.count("file"))
        file = external_options["file"].as<std::string>();
    else
        throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);

    if (external_options.count("name"))
        name = external_options["name"].as<std::string>();
    else
        throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);

    if (external_options.count("format"))
        format = external_options["format"].as<std::string>();
    else
        throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);

    if (external_options.count("structure"))
        parseStructureFromStructureField(external_options["structure"].as<std::string>());
    else if (external_options.count("types"))
        parseStructureFromTypesField(external_options["types"].as<std::string>());
    else
        throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
}


void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
{
    const Settings & settings = context.getSettingsRef();

    /// The buffer is initialized here, not in the virtual function initReadBuffer
    read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);

    if (settings.http_max_multipart_form_data_size)
        read_buffer = std::make_unique<LimitReadBuffer>(
            *read_buffer_impl, settings.http_max_multipart_form_data_size,
            true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
    else
        read_buffer = std::move(read_buffer_impl);

    /// Retrieve a collection of parameters from MessageHeader
    Poco::Net::NameValueCollection content;
    std::string label;
    Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);

    /// Get parameters
    name = content.get("name", "_data");
    format = params.get(name + "_format", "TabSeparated");

    if (params.has(name + "_structure"))
        parseStructureFromStructureField(params.get(name + "_structure"));
    else if (params.has(name + "_types"))
        parseStructureFromTypesField(params.get(name + "_types"));
    else
        throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);

158
    ExternalTableDataPtr data = getData(context);
159 160 161

    /// Create table
    NamesAndTypesList columns = sample_block.getNamesAndTypesList();
162
    auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}, {});
A
Alexander Tokmakov 已提交
163 164
    auto storage = temporary_table.getTable();
    context.addExternalTable(data->table_name, std::move(temporary_table));
165
    BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), context);
166 167

    /// Write data
A
Amos Bird 已提交
168
    data->pipe->resize(1);
N
Nikolai Kochetov 已提交
169

170
    auto sink = std::make_shared<SinkToOutputStream>(std::move(output));
N
Nikolai Kochetov 已提交
171
    connect(*data->pipe->getOutputPort(0), sink->getPort());
172

N
Nikolai Kochetov 已提交
173
    auto processors = Pipe::detachProcessors(std::move(*data->pipe));
174 175 176 177
    processors.push_back(std::move(sink));

    auto executor = std::make_shared<PipelineExecutor>(processors);
    executor->execute(/*num_threads = */ 1);
178 179

    /// We are ready to receive the next file, for this we clear all the information received
A
Alexey Milovidov 已提交
180
    clear();
181 182 183
}

}