Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
5930622e
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5930622e
编写于
3月 02, 2016
作者:
A
Andrea Sella
提交者:
Robert Metzger
6月 16, 2016
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-3311] Add At-Least-Once Cassandra connector
上级
a28a2d09
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
949 addition
and
10 deletion
+949
-10
docs/apis/streaming/connectors/cassandra.md
docs/apis/streaming/connectors/cassandra.md
+42
-0
docs/apis/streaming/fault_tolerance.md
docs/apis/streaming/fault_tolerance.md
+2
-2
flink-streaming-connectors/flink-connector-cassandra/pom.xml
flink-streaming-connectors/flink-connector-cassandra/pom.xml
+12
-0
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
...link/batch/connectors/cassandra/CassandraInputFormat.java
+129
-0
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
...ink/batch/connectors/cassandra/CassandraOutputFormat.java
+123
-0
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
...eaming/connectors/cassandra/CassandraAtLeastOnceSink.java
+91
-0
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
...ng/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
+61
-0
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
...e/flink/streaming/connectors/cassandra/CassandraSink.java
+19
-2
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
...g/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
+59
-0
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
...link/batch/connectors/cassandra/example/BatchExample.java
+71
-0
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
...treaming/connectors/cassandra/CassandraConnectorTest.java
+103
-6
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
...org/apache/flink/streaming/connectors/cassandra/Pojo.java
+65
-0
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
...assandra/example/CassandraPojoAtLeastOnceSinkExample.java
+57
-0
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
...ssandra/example/CassandraTupleAtLeastOnceSinkExample.java
+59
-0
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
...flink/streaming/connectors/cassandra/example/Message.java
+56
-0
未找到文件。
docs/apis/streaming/connectors/cassandra.md
浏览文件 @
5930622e
...
...
@@ -99,5 +99,47 @@ CassandraSink.addSink(input)
})
.build();
{% endhighlight %}
The Cassandra sinks support both tuples and POJO's that use DataStax annotations.
Flink automatically detects which type of input is used.
Example for such a Pojo:
<div
class=
"codetabs"
markdown=
"1"
>
<div
data-lang=
"java"
markdown=
"1"
>
{% highlight java %}
@Table(keyspace= "test", name = "mappersink")
public class Pojo implements Serializable {
private static final long serialVersionUID = 1038054554690916991L;
@Column(name = "id")
private long id;
@Column(name = "value")
private String value;
public Pojo(long id, String value){
this.id = id;
this.value = value;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
{% endhighlight %}
</div>
</div>
docs/apis/streaming/fault_tolerance.md
浏览文件 @
5930622e
...
...
@@ -178,8 +178,8 @@ state updates) of Flink coupled with bundled sinks:
</tr>
<tr>
<td>
Cassandra sink
</td>
<td>
exactly-once
</td>
<td>
only for idempotent updates
</td>
<td>
at-least-once /
exactly-once
</td>
<td>
exactly-once
only for idempotent updates
</td>
</tr>
<tr>
<td>
File sinks
</td>
...
...
flink-streaming-connectors/flink-connector-cassandra/pom.xml
浏览文件 @
5930622e
...
...
@@ -143,6 +143,18 @@ under the License.
<scope>
test
</scope>
<type>
test-jar
</type>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-tests_2.10
</artifactId>
<version>
${project.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-test-utils_2.10
</artifactId>
<version>
${project.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.apache.cassandra
</groupId>
<artifactId>
cassandra-all
</artifactId>
...
...
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.batch.connectors.cassandra
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.ResultSet
;
import
com.datastax.driver.core.Row
;
import
com.datastax.driver.core.Session
;
import
com.google.common.base.Strings
;
import
org.apache.flink.api.common.io.DefaultInputSplitAssigner
;
import
org.apache.flink.api.common.io.NonParallelInput
;
import
org.apache.flink.api.common.io.RichInputFormat
;
import
org.apache.flink.api.common.io.statistics.BaseStatistics
;
import
org.apache.flink.api.java.tuple.Tuple
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.core.io.GenericInputSplit
;
import
org.apache.flink.core.io.InputSplit
;
import
org.apache.flink.core.io.InputSplitAssigner
;
import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
/**
* InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
*
* @param <OUT> type of Tuple
*/
public
class
CassandraInputFormat
<
OUT
extends
Tuple
>
extends
RichInputFormat
<
OUT
,
InputSplit
>
implements
NonParallelInput
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraInputFormat
.
class
);
private
final
String
query
;
private
final
ClusterBuilder
builder
;
private
transient
Cluster
cluster
;
private
transient
Session
session
;
private
transient
ResultSet
resultSet
;
public
CassandraInputFormat
(
String
query
,
ClusterBuilder
builder
)
{
if
(
Strings
.
isNullOrEmpty
(
query
))
{
throw
new
IllegalArgumentException
(
"Query cannot be null or empty"
);
}
if
(
builder
==
null
)
{
throw
new
IllegalArgumentException
(
"Builder cannot be null."
);
}
this
.
query
=
query
;
this
.
builder
=
builder
;
}
@Override
public
void
configure
(
Configuration
parameters
)
{
this
.
cluster
=
builder
.
getCluster
();
}
@Override
public
BaseStatistics
getStatistics
(
BaseStatistics
cachedStatistics
)
throws
IOException
{
return
cachedStatistics
;
}
/**
* Opens a Session and executes the query.
*
* @param ignored
* @throws IOException
*/
@Override
public
void
open
(
InputSplit
ignored
)
throws
IOException
{
this
.
session
=
cluster
.
connect
();
this
.
resultSet
=
session
.
execute
(
query
);
}
@Override
public
boolean
reachedEnd
()
throws
IOException
{
return
resultSet
.
isExhausted
();
}
@Override
public
OUT
nextRecord
(
OUT
reuse
)
throws
IOException
{
final
Row
item
=
resultSet
.
one
();
for
(
int
i
=
0
;
i
<
reuse
.
getArity
();
i
++)
{
reuse
.
setField
(
item
.
getObject
(
i
),
i
);
}
return
reuse
;
}
@Override
public
InputSplit
[]
createInputSplits
(
int
minNumSplits
)
throws
IOException
{
GenericInputSplit
[]
split
=
{
new
GenericInputSplit
(
0
,
1
)};
return
split
;
}
@Override
public
InputSplitAssigner
getInputSplitAssigner
(
InputSplit
[]
inputSplits
)
{
return
new
DefaultInputSplitAssigner
(
inputSplits
);
}
/**
* Closes all resources used.
*/
@Override
public
void
close
()
throws
IOException
{
try
{
session
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Inputformat couldn't be closed - "
+
e
.
getMessage
());
}
try
{
cluster
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Inputformat couldn't be closed - "
+
e
.
getMessage
());
}
}
}
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.batch.connectors.cassandra
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.PreparedStatement
;
import
com.datastax.driver.core.ResultSet
;
import
com.datastax.driver.core.ResultSetFuture
;
import
com.datastax.driver.core.Session
;
import
com.google.common.base.Strings
;
import
com.google.common.util.concurrent.FutureCallback
;
import
com.google.common.util.concurrent.Futures
;
import
org.apache.flink.api.common.io.RichOutputFormat
;
import
org.apache.flink.api.java.tuple.Tuple
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
/**
* OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra.
*
* @param <OUT> type of Tuple
*/
public
class
CassandraOutputFormat
<
OUT
extends
Tuple
>
extends
RichOutputFormat
<
OUT
>
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraOutputFormat
.
class
);
private
final
String
insertQuery
;
private
final
ClusterBuilder
builder
;
private
transient
Cluster
cluster
;
private
transient
Session
session
;
private
transient
PreparedStatement
prepared
;
private
transient
FutureCallback
<
ResultSet
>
callback
;
private
transient
Throwable
exception
=
null
;
public
CassandraOutputFormat
(
String
insertQuery
,
ClusterBuilder
builder
)
{
if
(
Strings
.
isNullOrEmpty
(
insertQuery
))
{
throw
new
IllegalArgumentException
(
"insertQuery cannot be null or empty"
);
}
if
(
builder
==
null
)
{
throw
new
IllegalArgumentException
(
"Builder cannot be null."
);
}
this
.
insertQuery
=
insertQuery
;
this
.
builder
=
builder
;
}
@Override
public
void
configure
(
Configuration
parameters
)
{
this
.
cluster
=
builder
.
getCluster
();
}
/**
* Opens a Session to Cassandra and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public
void
open
(
int
taskNumber
,
int
numTasks
)
throws
IOException
{
this
.
session
=
cluster
.
connect
();
this
.
prepared
=
session
.
prepare
(
insertQuery
);
this
.
callback
=
new
FutureCallback
<
ResultSet
>()
{
@Override
public
void
onSuccess
(
ResultSet
ignored
)
{
}
@Override
public
void
onFailure
(
Throwable
t
)
{
exception
=
t
;
}
};
}
@Override
public
void
writeRecord
(
OUT
record
)
throws
IOException
{
if
(
exception
!=
null
)
{
throw
new
IOException
(
"write record failed"
,
exception
);
}
Object
[]
fields
=
new
Object
[
record
.
getArity
()];
for
(
int
i
=
0
;
i
<
record
.
getArity
();
i
++)
{
fields
[
i
]
=
record
.
getField
(
i
);
}
ResultSetFuture
result
=
session
.
executeAsync
(
prepared
.
bind
(
fields
));
Futures
.
addCallback
(
result
,
callback
);
}
/**
* Closes all resources used.
*/
@Override
public
void
close
()
throws
IOException
{
try
{
session
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Inputformat couldn't be closed - "
+
e
.
getMessage
());
}
try
{
cluster
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
info
(
"Inputformat couldn't be closed - "
+
e
.
getMessage
());
}
}
}
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAtLeastOnceSink.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.Session
;
import
com.google.common.util.concurrent.FutureCallback
;
import
com.google.common.util.concurrent.Futures
;
import
com.google.common.util.concurrent.ListenableFuture
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
/**
* CassandraAtLeastOnceSink is the common abstract class of {@link CassandraPojoAtLeastOnceSink} and {@link CassandraTupleAtLeastOnceSink}.
*
* @param <IN> Type of the elements emitted by this sink
*/
public
abstract
class
CassandraAtLeastOnceSink
<
IN
,
V
>
extends
RichSinkFunction
<
IN
>
{
protected
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
CassandraAtLeastOnceSink
.
class
);
protected
transient
Cluster
cluster
;
protected
transient
Session
session
;
protected
transient
Throwable
exception
=
null
;
protected
transient
FutureCallback
<
V
>
callback
;
private
final
ClusterBuilder
builder
;
protected
CassandraAtLeastOnceSink
(
ClusterBuilder
builder
)
{
this
.
builder
=
builder
;
}
@Override
public
void
open
(
Configuration
configuration
)
{
this
.
callback
=
new
FutureCallback
<
V
>()
{
@Override
public
void
onSuccess
(
V
ignored
)
{
}
@Override
public
void
onFailure
(
Throwable
t
)
{
exception
=
t
;
}
};
this
.
cluster
=
builder
.
getCluster
();
this
.
session
=
cluster
.
connect
();
}
@Override
public
void
invoke
(
IN
value
)
throws
Exception
{
if
(
exception
!=
null
)
{
throw
new
IOException
(
"invoke() failed"
,
exception
);
}
ListenableFuture
<
V
>
result
=
send
(
value
);
Futures
.
addCallback
(
result
,
callback
);
}
public
abstract
ListenableFuture
<
V
>
send
(
IN
value
);
@Override
public
void
close
()
{
try
{
session
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error while closing session."
,
e
);
}
try
{
cluster
.
close
();
}
catch
(
Exception
e
)
{
LOG
.
error
(
"Error while closing cluster."
,
e
);
}
}
}
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoAtLeastOnceSink.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra
;
import
com.datastax.driver.mapping.Mapper
;
import
com.datastax.driver.mapping.MappingManager
;
import
com.google.common.util.concurrent.ListenableFuture
;
import
org.apache.flink.configuration.Configuration
;
/**
* Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
* it uses annotations from {@link com.datastax.driver.mapping}.
*
* @param <IN> Type of the elements emitted by this sink
*/
public
class
CassandraPojoAtLeastOnceSink
<
IN
>
extends
CassandraAtLeastOnceSink
<
IN
,
Void
>
{
protected
Class
<
IN
>
clazz
;
protected
transient
Mapper
<
IN
>
mapper
;
protected
transient
MappingManager
mappingManager
;
/**
* The main constructor for creating CassandraPojoAtLeastOnceSink
*
* @param clazz Class<IN> instance
*/
public
CassandraPojoAtLeastOnceSink
(
Class
<
IN
>
clazz
,
ClusterBuilder
builder
)
{
super
(
builder
);
this
.
clazz
=
clazz
;
}
@Override
public
void
open
(
Configuration
configuration
)
{
super
.
open
(
configuration
);
try
{
this
.
mappingManager
=
new
MappingManager
(
session
);
this
.
mapper
=
mappingManager
.
mapper
(
clazz
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Cannot create CassandraPojoAtLeastOnceSink with input: "
+
clazz
.
getSimpleName
(),
e
);
}
}
@Override
public
ListenableFuture
<
Void
>
send
(
IN
value
)
{
return
mapper
.
saveAsync
(
value
);
}
}
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
浏览文件 @
5930622e
...
...
@@ -166,7 +166,7 @@ public class CassandraSink<IN> {
DataStream
<
T
>
tupleInput
=
(
DataStream
<
T
>)
input
;
return
(
CassandraSinkBuilder
<
IN
>)
new
CassandraTupleSinkBuilder
<>(
tupleInput
,
tupleInput
.
getType
(),
tupleInput
.
getType
().
createSerializer
(
tupleInput
.
getExecutionEnvironment
().
getConfig
()));
}
else
{
throw
new
IllegalArgumentException
(
"POJOs are currently not supported."
);
return
new
CassandraPojoSinkBuilder
<>(
input
,
input
.
getType
(),
input
.
getType
().
createSerializer
(
input
.
getExecutionEnvironment
().
getConfig
())
);
}
}
...
...
@@ -275,8 +275,25 @@ public class CassandraSink<IN> {
?
new
CassandraSink
<>(
input
.
transform
(
"Cassandra Sink"
,
null
,
new
CassandraIdempotentExactlyOnceSink
<>(
query
,
serializer
,
builder
,
jobID
,
new
CassandraCommitter
(
builder
))))
:
new
CassandraSink
<>(
input
.
transform
(
"Cassandra Sink"
,
null
,
new
CassandraIdempotentExactlyOnceSink
<>(
query
,
serializer
,
builder
,
jobID
,
committer
)));
}
else
{
throw
new
IllegalArgumentException
(
"There is currently no dedicated support for at-least-once guarantees."
);
return
new
CassandraSink
<>(
input
.
addSink
(
new
CassandraTupleAtLeastOnceSink
<
IN
>(
query
,
builder
)).
name
(
"Cassandra Sink"
)
);
}
}
}
public
static
class
CassandraPojoSinkBuilder
<
IN
>
extends
CassandraSinkBuilder
<
IN
>
{
public
CassandraPojoSinkBuilder
(
DataStream
<
IN
>
input
,
TypeInformation
<
IN
>
typeInfo
,
TypeSerializer
<
IN
>
serializer
)
{
super
(
input
,
typeInfo
,
serializer
);
}
@Override
public
CassandraSink
<
IN
>
build
()
throws
Exception
{
if
(
consistency
==
ConsistencyLevel
.
EXACTLY_ONCE
)
{
throw
new
IllegalArgumentException
(
"Exactly-once guarantees can only be provided for tuple types."
);
}
if
(
consistency
==
ConsistencyLevel
.
At_LEAST_ONCE
)
{
return
new
CassandraSink
<>(
input
.
addSink
(
new
CassandraPojoAtLeastOnceSink
<>(
typeInfo
.
getTypeClass
(),
builder
)).
name
(
"Cassandra Sink"
));
}
throw
new
IllegalArgumentException
(
"No consistency level was specified."
);
}
}
}
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleAtLeastOnceSink.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra
;
import
com.datastax.driver.core.PreparedStatement
;
import
com.datastax.driver.core.ResultSet
;
import
com.google.common.util.concurrent.ListenableFuture
;
import
org.apache.flink.api.java.tuple.Tuple
;
import
org.apache.flink.configuration.Configuration
;
/**
* Flink Sink to save data into a Cassandra cluster.
*
* @param <IN> Type of the elements emitted by this sink, it must extend {@link Tuple}
*/
public
class
CassandraTupleAtLeastOnceSink
<
IN
extends
Tuple
>
extends
CassandraAtLeastOnceSink
<
IN
,
ResultSet
>
{
private
final
String
insertQuery
;
private
transient
PreparedStatement
ps
;
public
CassandraTupleAtLeastOnceSink
(
String
insertQuery
,
ClusterBuilder
builder
)
{
super
(
builder
);
this
.
insertQuery
=
insertQuery
;
}
@Override
public
void
open
(
Configuration
configuration
)
{
super
.
open
(
configuration
);
this
.
ps
=
session
.
prepare
(
insertQuery
);
}
@Override
public
ListenableFuture
<
ResultSet
>
send
(
IN
value
)
{
Object
[]
fields
=
extract
(
value
);
return
session
.
executeAsync
(
ps
.
bind
(
fields
));
}
private
Object
[]
extract
(
IN
record
)
{
Object
[]
al
=
new
Object
[
record
.
getArity
()];
for
(
int
i
=
0
;
i
<
record
.
getArity
();
i
++)
{
al
[
i
]
=
record
.
getField
(
i
);
}
return
al
;
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.batch.connectors.cassandra.example
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.Cluster.Builder
;
import
org.apache.flink.api.common.typeinfo.TypeHint
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.typeutils.TupleTypeInfo
;
import
org.apache.flink.batch.connectors.cassandra.CassandraInputFormat
;
import
org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat
;
import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
;
import
java.util.ArrayList
;
public
class
BatchExample
{
private
static
final
String
INSERT_QUERY
=
"INSERT INTO test.batches (number, strings) VALUES (?,?);"
;
private
static
final
String
SELECT_QUERY
=
"SELECT number, strings FROM test.batches;"
;
/*
* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));"
*/
public
static
void
main
(
String
[]
args
)
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
ArrayList
<
Tuple2
<
Integer
,
String
>>
collection
=
new
ArrayList
<>(
20
);
for
(
int
i
=
0
;
i
<
20
;
i
++)
{
collection
.
add
(
new
Tuple2
<>(
i
,
"string "
+
i
));
}
DataSet
<
Tuple2
<
Integer
,
String
>>
dataSet
=
env
.
fromCollection
(
collection
);
dataSet
.
output
(
new
CassandraOutputFormat
<
Tuple2
<
Integer
,
String
>>(
INSERT_QUERY
,
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Builder
builder
)
{
return
builder
.
addContactPoints
(
"127.0.0.1"
).
build
();
}
}));
env
.
execute
(
"Write"
);
DataSet
<
Tuple2
<
Integer
,
String
>>
inputDS
=
env
.
createInput
(
new
CassandraInputFormat
<
Tuple2
<
Integer
,
String
>>(
SELECT_QUERY
,
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Builder
builder
)
{
return
builder
.
addContactPoints
(
"127.0.0.1"
).
build
();
}
}),
TupleTypeInfo
.
of
(
new
TypeHint
<
Tuple2
<
Integer
,
String
>>()
{
}));
inputDS
.
print
();
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
浏览文件 @
5930622e
...
...
@@ -23,10 +23,20 @@ import com.datastax.driver.core.Row;
import
com.datastax.driver.core.Session
;
import
org.apache.cassandra.service.CassandraDaemon
;
import
org.apache.flink.api.common.ExecutionConfig
;
import
org.apache.flink.api.common.typeinfo.BasicTypeInfo
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.tuple.Tuple3
;
import
org.apache.flink.api.java.typeutils.TupleTypeInfo
;
import
org.apache.flink.api.java.typeutils.TypeExtractor
;
import
org.apache.flink.batch.connectors.cassandra.CassandraInputFormat
;
import
org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat
;
import
org.apache.flink.runtime.testutils.CommonTestUtils
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.functions.source.SourceFunction
;
import
org.apache.flink.streaming.runtime.operators.AtLeastOnceSinkTestBase
;
import
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask
;
import
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness
;
...
...
@@ -34,6 +44,7 @@ import org.junit.After;
import
org.junit.AfterClass
;
import
org.junit.Assert
;
import
org.junit.BeforeClass
;
import
org.junit.Test
;
import
java.io.BufferedWriter
;
import
java.io.File
;
...
...
@@ -48,6 +59,12 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
private
static
final
boolean
EMBEDDED
=
true
;
private
static
EmbeddedCassandraService
cassandra
;
private
transient
static
ClusterBuilder
builder
=
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Cluster
.
Builder
builder
)
{
return
builder
.
addContactPoint
(
"127.0.0.1"
).
build
();
}
};
private
static
Cluster
cluster
;
private
static
Session
session
;
...
...
@@ -58,6 +75,14 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
private
static
final
String
INSERT_DATA_QUERY
=
"INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"
;
private
static
final
String
SELECT_DATA_QUERY
=
"SELECT * FROM flink.test;"
;
private
static
final
ArrayList
<
Tuple3
<
String
,
Integer
,
Integer
>>
collection
=
new
ArrayList
<>(
20
);
static
{
for
(
int
i
=
0
;
i
<
20
;
i
++)
{
collection
.
add
(
new
Tuple3
<>(
""
+
UUID
.
randomUUID
(),
i
,
0
));
}
}
private
static
class
EmbeddedCassandraService
{
CassandraDaemon
cassandraDaemon
;
...
...
@@ -72,6 +97,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
}
}
//=====Setup========================================================================================================
@BeforeClass
public
static
void
startCassandra
()
throws
IOException
{
//generate temporary files
...
...
@@ -102,6 +128,11 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
cassandra
.
start
();
}
try
{
Thread
.
sleep
(
1000
*
10
);
}
catch
(
InterruptedException
e
)
{
//give cassandra a few seconds to start up
}
cluster
=
Cluster
.
builder
().
addContactPoint
(
"127.0.0.1"
).
build
();
session
=
cluster
.
connect
();
...
...
@@ -125,6 +156,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
tmpDir
.
delete
();
}
//=====Exactly-Once=================================================================================================
@Override
protected
CassandraIdempotentExactlyOnceSink
<
Tuple3
<
String
,
Integer
,
Integer
>>
createSink
()
{
try
{
...
...
@@ -138,12 +170,7 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
}
},
"testJob"
,
new
CassandraCommitter
(
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Cluster
.
Builder
builder
)
{
return
builder
.
addContactPoint
(
"127.0.0.1"
).
build
();
}
}));
new
CassandraCommitter
(
builder
));
}
catch
(
Exception
e
)
{
Assert
.
fail
(
"Failure while initializing sink: "
+
e
.
getMessage
());
return
null
;
...
...
@@ -216,4 +243,74 @@ public class CassandraConnectorTest extends AtLeastOnceSinkTestBase<Tuple3<Strin
}
Assert
.
assertTrue
(
"The following ID's were not found in the ResultSet: "
+
list
.
toString
(),
list
.
isEmpty
());
}
//=====At-Least-Once================================================================================================
@Test
public
void
testCassandraTupleAtLeastOnceSink
()
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
DataStream
<
Tuple3
<
String
,
Integer
,
Integer
>>
source
=
env
.
fromCollection
(
collection
);
source
.
addSink
(
new
CassandraTupleAtLeastOnceSink
<
Tuple3
<
String
,
Integer
,
Integer
>>(
INSERT_DATA_QUERY
,
builder
));
env
.
execute
();
ResultSet
rs
=
session
.
execute
(
SELECT_DATA_QUERY
);
Assert
.
assertEquals
(
20
,
rs
.
all
().
size
());
}
@Test
public
void
testCassandraPojoAtLeastOnceSink
()
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
DataStreamSource
<
Pojo
>
source
=
env
.
addSource
(
new
SourceFunction
<
Pojo
>()
{
private
boolean
running
=
true
;
private
volatile
int
cnt
=
0
;
@Override
public
void
run
(
SourceContext
<
Pojo
>
ctx
)
throws
Exception
{
while
(
running
)
{
ctx
.
collect
(
new
Pojo
(
UUID
.
randomUUID
().
toString
(),
cnt
,
0
));
cnt
++;
if
(
cnt
==
20
)
{
cancel
();
}
}
}
@Override
public
void
cancel
()
{
running
=
false
;
}
});
source
.
addSink
(
new
CassandraPojoAtLeastOnceSink
<>(
Pojo
.
class
,
builder
));
env
.
execute
();
ResultSet
rs
=
session
.
execute
(
SELECT_DATA_QUERY
);
Assert
.
assertEquals
(
20
,
rs
.
all
().
size
());
}
@Test
public
void
testCassandraBatchFormats
()
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setParallelism
(
1
);
DataSet
<
Tuple3
<
String
,
Integer
,
Integer
>>
dataSet
=
env
.
fromCollection
(
collection
);
dataSet
.
output
(
new
CassandraOutputFormat
<
Tuple3
<
String
,
Integer
,
Integer
>>(
INSERT_DATA_QUERY
,
builder
));
env
.
execute
(
"Write data"
);
DataSet
<
Tuple3
<
String
,
Integer
,
Integer
>>
inputDS
=
env
.
createInput
(
new
CassandraInputFormat
<
Tuple3
<
String
,
Integer
,
Integer
>>(
SELECT_DATA_QUERY
,
builder
),
new
TupleTypeInfo
(
Tuple3
.
class
,
BasicTypeInfo
.
STRING_TYPE_INFO
,
BasicTypeInfo
.
INT_TYPE_INFO
,
BasicTypeInfo
.
INT_TYPE_INFO
));
long
count
=
inputDS
.
count
();
Assert
.
assertEquals
(
count
,
20L
);
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/Pojo.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra
;
import
com.datastax.driver.mapping.annotations.Column
;
import
com.datastax.driver.mapping.annotations.Table
;
import
java.io.Serializable
;
@Table
(
keyspace
=
"flink"
,
name
=
"test"
)
public
class
Pojo
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1038054554690916991L
;
@Column
(
name
=
"id"
)
private
String
id
;
@Column
(
name
=
"counter"
)
private
int
counter
;
@Column
(
name
=
"batch_id"
)
private
int
batch_id
;
public
Pojo
(
String
id
,
int
counter
,
int
batch_id
)
{
this
.
id
=
id
;
this
.
counter
=
counter
;
this
.
batch_id
=
batch_id
;
}
public
String
getId
()
{
return
id
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
int
getCounter
()
{
return
counter
;
}
public
void
setCounter
(
int
counter
)
{
this
.
counter
=
counter
;
}
public
int
getBatch_id
()
{
return
batch_id
;
}
public
void
setBatch_id
(
int
batch_id
)
{
this
.
batch_id
=
batch_id
;
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoAtLeastOnceSinkExample.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra.example
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.Cluster.Builder
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.cassandra.CassandraSink
;
import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
;
import
java.util.ArrayList
;
public
class
CassandraPojoAtLeastOnceSinkExample
{
private
static
final
ArrayList
<
Message
>
messages
=
new
ArrayList
<>(
20
);
static
{
for
(
long
i
=
0
;
i
<
20
;
i
++)
{
messages
.
add
(
new
Message
(
"cassandra-"
+
i
));
}
}
/*
* create table: "CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY);"
*/
public
static
void
main
(
String
[]
args
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
DataStreamSource
<
Message
>
source
=
env
.
fromCollection
(
messages
);
CassandraSink
.
addSink
(
source
)
.
setClusterBuilder
(
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Builder
builder
)
{
return
builder
.
addContactPoint
(
"127.0.0.1"
).
build
();
}
})
.
build
();
env
.
execute
(
"Cassandra Sink example"
);
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleAtLeastOnceSinkExample.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra.example
;
import
com.datastax.driver.core.Cluster
;
import
com.datastax.driver.core.Cluster.Builder
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.streaming.api.datastream.DataStreamSource
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.cassandra.CassandraSink
;
import
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder
;
import
java.util.ArrayList
;
public
class
CassandraTupleAtLeastOnceSinkExample
{
private
static
final
String
INSERT
=
"INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"
;
private
static
final
ArrayList
<
Tuple2
<
String
,
Integer
>>
collection
=
new
ArrayList
<>(
20
);
static
{
for
(
int
i
=
0
;
i
<
20
;
i
++)
{
collection
.
add
(
new
Tuple2
<>(
"cassandra-"
+
i
,
i
));
}
}
/*
* table script: "CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int)"
*/
public
static
void
main
(
String
[]
args
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
DataStreamSource
<
Tuple2
<
String
,
Integer
>>
source
=
env
.
fromCollection
(
collection
);
CassandraSink
.
addSink
(
source
)
.
setQuery
(
INSERT
)
.
setClusterBuilder
(
new
ClusterBuilder
()
{
@Override
protected
Cluster
buildCluster
(
Builder
builder
)
{
return
builder
.
addContactPoint
(
"127.0.0.1"
).
build
();
}
})
.
build
();
env
.
execute
(
"WriteTupleIntoCassandra"
);
}
}
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/Message.java
0 → 100644
浏览文件 @
5930622e
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.connectors.cassandra.example
;
import
com.datastax.driver.mapping.annotations.Column
;
import
com.datastax.driver.mapping.annotations.Table
;
import
java.io.Serializable
;
@Table
(
keyspace
=
"test"
,
name
=
"message"
)
public
class
Message
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1123119384361005680L
;
@Column
(
name
=
"body"
)
private
String
message
;
public
Message
(
String
word
)
{
this
.
message
=
word
;
}
public
String
getMessage
()
{
return
message
;
}
public
void
setMessage
(
String
word
)
{
this
.
message
=
word
;
}
public
boolean
equals
(
Object
other
)
{
if
(
other
instanceof
Message
)
{
Message
that
=
(
Message
)
other
;
return
this
.
message
.
equals
(
that
.
message
);
}
return
false
;
}
@Override
public
int
hashCode
()
{
return
message
.
hashCode
();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录