ODBCDriver2BlockOutputFormat.cpp 3.5 KB
Newer Older
N
Nikolai Kochetov 已提交
1 2 3 4 5 6 7 8
#include <Core/Block.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>


#include <Core/iostream_debug_helpers.h>
9
#include <DataTypes/DataTypeLowCardinality.h>
N
Nikolai Kochetov 已提交
10 11 12 13 14


namespace DB
{
ODBCDriver2BlockOutputFormat::ODBCDriver2BlockOutputFormat(
K
kreuzerkrieg 已提交
15 16
    WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
    : IOutputFormat(header_, out_), format_settings(format_settings_)
N
Nikolai Kochetov 已提交
17 18 19
{
}

N
Nikolai Kochetov 已提交
20
static void writeODBCString(WriteBuffer & out, const std::string & str)
N
Nikolai Kochetov 已提交
21 22 23 24 25
{
    writeIntBinary(Int32(str.size()), out);
    out.write(str.data(), str.size());
}

A
Anton Popov 已提交
26
void ODBCDriver2BlockOutputFormat::writeRow(const Serializations & serializations, const Columns & columns, size_t row_idx, std::string & buffer)
N
Nikolai Kochetov 已提交
27 28 29 30 31
{
    size_t num_columns = columns.size();
    for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
    {
        buffer.clear();
A
Alexey Milovidov 已提交
32
        const auto & column = columns[column_idx];
N
Nikolai Kochetov 已提交
33 34 35 36 37 38 39 40 41

        if (column->isNullAt(row_idx))
        {
            writeIntBinary(Int32(-1), out);
        }
        else
        {
            {
                WriteBufferFromString text_out(buffer);
A
Anton Popov 已提交
42
                serializations[column_idx]->serializeText(*column, row_idx, text_out, format_settings);
N
Nikolai Kochetov 已提交
43 44 45 46 47 48 49 50 51
            }
            writeODBCString(out, buffer);
        }
    }
}

void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind port_kind)
{
    String text_value;
A
Alexey Milovidov 已提交
52 53
    const auto & header = getPort(port_kind).getHeader();
    const auto & columns = chunk.getColumns();
A
Anton Popov 已提交
54 55 56 57 58 59

    size_t num_columns = columns.size();
    Serializations serializations(num_columns);
    for (size_t i = 0; i < num_columns; ++i)
        serializations[i] = header.getByPosition(i).type->getDefaultSerialization();

N
Nikolai Kochetov 已提交
60 61
    const size_t rows = chunk.getNumRows();
    for (size_t i = 0; i < rows; ++i)
A
Anton Popov 已提交
62
        writeRow(serializations, columns, i, text_value);
N
Nikolai Kochetov 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
}

void ODBCDriver2BlockOutputFormat::consume(Chunk chunk)
{
    writePrefixIfNot();
    write(std::move(chunk), PortKind::Main);
}

void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk)
{
    writePrefixIfNot();
    write(std::move(chunk), PortKind::Totals);
}

void ODBCDriver2BlockOutputFormat::finalize()
{
    writePrefixIfNot();
}

void ODBCDriver2BlockOutputFormat::writePrefix()
{
A
Alexey Milovidov 已提交
84
    const auto & header = getPort(PortKind::Main).getHeader();
N
Nikolai Kochetov 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
    const size_t columns = header.columns();

    /// Number of header rows.
    writeIntBinary(Int32(2), out);

    /// Names of columns.
    /// Number of columns + 1 for first name column.
    writeIntBinary(Int32(columns + 1), out);
    writeODBCString(out, "name");
    for (size_t i = 0; i < columns; ++i)
    {
        const ColumnWithTypeAndName & col = header.getByPosition(i);
        writeODBCString(out, col.name);
    }

    /// Types of columns.
    writeIntBinary(Int32(columns + 1), out);
    writeODBCString(out, "type");
    for (size_t i = 0; i < columns; ++i)
    {
105 106 107 108
        auto type = header.getByPosition(i).type;
        if (type->lowCardinality())
            type = recursiveRemoveLowCardinality(type);
        writeODBCString(out, type->getName());
N
Nikolai Kochetov 已提交
109 110 111 112 113 114 115
    }
}


void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
{
    factory.registerOutputFormatProcessor(
116
        "ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
N
Nikolai Kochetov 已提交
117 118 119 120 121 122
        {
            return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
        });
}

}