提交 110bba38 编写于 作者: T twalthr

[FLINK-4103] [table] Add CsvTableSource docs and Java accessibility

上级 7e309eeb
......@@ -252,6 +252,89 @@ tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
Table result = tableEnvironment.ingest("kafka-source");
```
#### CsvTableSource
The `CsvTableSource` is already included in `flink-table` without additional dependecies.
It can be configured with the following properties:
- `path` The path to the CSV file, required.
- `fieldNames` The names of the table fields, required.
- `fieldTypes` The types of the table fields, required.
- `fieldDelim` The field delimiter, `","` by default.
- `rowDelim` The row delimiter, `"\n"` by default.
- `quoteCharacter` An optional quote character for String values, `null` by default.
- `ignoreFirstLine` Flag to ignore the first line, `false` by default.
- `ignoreComments` An optional prefix to indicate comments, `null` by default.
- `lenient` Flag to skip records with parse error instead to fail, `false` by default.
You can create the source as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
CsvTableSource csvTableSource = new CsvTableSource(
"/path/to/your/file.csv",
new String[] { "name", "id", "score", "comments" },
new TypeInformation<?>[] {
Types.STRING(),
Types.INT(),
Types.DOUBLE(),
Types.STRING()
},
"#", // fieldDelim
"$", // rowDelim
null, // quoteCharacter
true, // ignoreFirstLine
"%", // ignoreComments
false); // lenient
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val csvTableSource = new CsvTableSource(
"/path/to/your/file.csv",
Array("name", "id", "score", "comments"),
Array(
Types.STRING,
Types.INT,
Types.DOUBLE,
Types.STRING
),
fieldDelim = "#",
rowDelim = "$",
ignoreFirstLine = true,
ignoreComments = "%")
{% endhighlight %}
</div>
</div>
You can work with the Table as explained in the rest of the Table API guide in both stream and batch `TableEnvironment`s:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
tableEnvironment.registerTableSource("mycsv", csvTableSource);
Table streamTable = streamTableEnvironment.ingest("mycsv");
Table batchTable = batchTableEnvironment.scan("mycsv");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
tableEnvironment.registerTableSource("mycsv", csvTableSource)
val streamTable = streamTableEnvironment.ingest("mycsv")
val batchTable = batchTableEnvironment.scan("mycsv")
{% endhighlight %}
</div>
</div>
Table API
----------
The Table API provides methods to apply relational operations on DataSets and Datastreams both in Scala and Java.
......
......@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
* @param path The path to the CSV file.
* @param fieldNames The names of the table fields.
* @param fieldTypes The types of the table fields.
* @param fieldDelim The field delimiter, ',' by default.
* @param rowDelim The row delimiter, '\n' by default.
* @param quoteCharacter An optional quote character for String values, disabled by default.
* @param fieldDelim The field delimiter, "," by default.
* @param rowDelim The row delimiter, "\n" by default.
* @param quoteCharacter An optional quote character for String values, null by default.
* @param ignoreFirstLine Flag to ignore the first line, false by default.
* @param ignoreComments An optional prefix to indicate comments, disabled by default.
* @param ignoreComments An optional prefix to indicate comments, null by default.
* @param lenient Flag to skip records with parse error instead to fail, false by default.
*/
class CsvTableSource(
......@@ -55,6 +55,18 @@ class CsvTableSource(
extends BatchTableSource[Row]
with StreamTableSource[Row] {
/**
* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
* (logically) unlimited number of fields.
*
* @param path The path to the CSV file.
* @param fieldNames The names of the table fields.
* @param fieldTypes The types of the table fields.
*/
def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
if (fieldNames.length != fieldTypes.length) {
throw TableException("Number of field names and field types must be equal.")
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册