Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
50301254
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
50301254
编写于
12月 18, 2017
作者:
N
Nico Kruber
提交者:
zentol
1月 08, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-8280][checkstyle] enable and fix checkstyle in BlobServer and BlobUtils
This closes #5175.
上级
3cdc5d1d
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
76 addition
and
78 deletion
+76
-78
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
...c/main/java/org/apache/flink/runtime/blob/BlobClient.java
+1
-1
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
...n/java/org/apache/flink/runtime/blob/BlobInputStream.java
+3
-3
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
.../src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+6
-6
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
...c/main/java/org/apache/flink/runtime/blob/BlobServer.java
+23
-23
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
...ava/org/apache/flink/runtime/blob/BlobServerProtocol.java
+5
-0
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
...rc/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+6
-6
flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
...va/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+4
-4
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
.../java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+1
-1
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
...java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+20
-20
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
...st/java/org/apache/flink/runtime/blob/BlobClientTest.java
+6
-6
flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
...g/apache/flink/runtime/blob/TestingFailingBlobServer.java
+1
-1
tools/maven/suppressions-runtime.xml
tools/maven/suppressions-runtime.xml
+0
-7
未找到文件。
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
浏览文件 @
50301254
...
@@ -119,7 +119,7 @@ public final class BlobClient implements Closeable {
...
@@ -119,7 +119,7 @@ public final class BlobClient implements Closeable {
}
}
}
}
catch
(
Exception
e
)
{
catch
(
Exception
e
)
{
BlobUtils
.
closeSilently
(
socket
,
LOG
);
BlobUtils
.
closeSilently
(
socket
,
LOG
);
throw
new
IOException
(
"Could not connect to BlobServer at address "
+
serverAddress
,
e
);
throw
new
IOException
(
"Could not connect to BlobServer at address "
+
serverAddress
,
e
);
}
}
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
浏览文件 @
50301254
...
@@ -50,7 +50,7 @@ final class BlobInputStream extends InputStream {
...
@@ -50,7 +50,7 @@ final class BlobInputStream extends InputStream {
private
final
OutputStream
wrappedOutputStream
;
private
final
OutputStream
wrappedOutputStream
;
/**
/**
* The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null<code>.
* The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null<
/
code>.
*/
*/
private
final
BlobKey
blobKey
;
private
final
BlobKey
blobKey
;
...
@@ -72,7 +72,7 @@ final class BlobInputStream extends InputStream {
...
@@ -72,7 +72,7 @@ final class BlobInputStream extends InputStream {
/**
/**
* Constructs a new BLOB input stream.
* Constructs a new BLOB input stream.
*
*
* @param wrappedInputStream
* @param wrappedInputStream
* the underlying input stream to read from
* the underlying input stream to read from
* @param blobKey
* @param blobKey
...
@@ -98,7 +98,7 @@ final class BlobInputStream extends InputStream {
...
@@ -98,7 +98,7 @@ final class BlobInputStream extends InputStream {
/**
/**
* Convenience method to throw an {@link EOFException}.
* Convenience method to throw an {@link EOFException}.
*
*
* @throws EOFException
* @throws EOFException
* thrown to indicate the underlying input stream did not provide as much data as expected
* thrown to indicate the underlying input stream did not provide as much data as expected
*/
*/
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
浏览文件 @
50301254
...
@@ -199,14 +199,14 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
...
@@ -199,14 +199,14 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
/**
/**
* Adds the BLOB key to the given {@link MessageDigest}.
* Adds the BLOB key to the given {@link MessageDigest}.
*
*
* @param md
* @param md
* the message digest to add the BLOB key to
* the message digest to add the BLOB key to
*/
*/
public
void
addToMessageDigest
(
MessageDigest
md
)
{
public
void
addToMessageDigest
(
MessageDigest
md
)
{
md
.
update
(
this
.
key
);
md
.
update
(
this
.
key
);
}
}
@Override
@Override
public
boolean
equals
(
final
Object
obj
)
{
public
boolean
equals
(
final
Object
obj
)
{
...
@@ -252,7 +252,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
...
@@ -252,7 +252,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
final
byte
[]
aarr
=
this
.
key
;
final
byte
[]
aarr
=
this
.
key
;
final
byte
[]
barr
=
o
.
key
;
final
byte
[]
barr
=
o
.
key
;
final
int
len
=
Math
.
min
(
aarr
.
length
,
barr
.
length
);
final
int
len
=
Math
.
min
(
aarr
.
length
,
barr
.
length
);
for
(
int
i
=
0
;
i
<
len
;
++
i
)
{
for
(
int
i
=
0
;
i
<
len
;
++
i
)
{
final
int
a
=
(
aarr
[
i
]
&
0xff
);
final
int
a
=
(
aarr
[
i
]
&
0xff
);
final
int
b
=
(
barr
[
i
]
&
0xff
);
final
int
b
=
(
barr
[
i
]
&
0xff
);
...
@@ -274,12 +274,12 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
...
@@ -274,12 +274,12 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
return
aarr
.
length
-
barr
.
length
;
return
aarr
.
length
-
barr
.
length
;
}
}
}
}
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
/**
/**
* Auxiliary method to read a BLOB key from an input stream.
* Auxiliary method to read a BLOB key from an input stream.
*
*
* @param inputStream
* @param inputStream
* the input stream to read the BLOB key from
* the input stream to read the BLOB key from
* @return the read BLOB key
* @return the read BLOB key
...
@@ -331,7 +331,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
...
@@ -331,7 +331,7 @@ abstract class BlobKey implements Serializable, Comparable<BlobKey> {
/**
/**
* Auxiliary method to write this BLOB key to an output stream.
* Auxiliary method to write this BLOB key to an output stream.
*
*
* @param outputStream
* @param outputStream
* the output stream to write the BLOB key to
* the output stream to write the BLOB key to
* @throws IOException
* @throws IOException
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
浏览文件 @
50301254
...
@@ -77,28 +77,28 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -77,28 +77,28 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/** The server socket listening for incoming connections. */
/** The server socket listening for incoming connections. */
private
final
ServerSocket
serverSocket
;
private
final
ServerSocket
serverSocket
;
/** The SSL server context if ssl is enabled for the connections */
/** The SSL server context if ssl is enabled for the connections
.
*/
private
final
SSLContext
serverSSLContext
;
private
final
SSLContext
serverSSLContext
;
/** Blob Server configuration */
/** Blob Server configuration
.
*/
private
final
Configuration
blobServiceConfiguration
;
private
final
Configuration
blobServiceConfiguration
;
/** Indicates whether a shutdown of server component has been requested. */
/** Indicates whether a shutdown of server component has been requested. */
private
final
AtomicBoolean
shutdownRequested
=
new
AtomicBoolean
();
private
final
AtomicBoolean
shutdownRequested
=
new
AtomicBoolean
();
/** Root directory for local file storage */
/** Root directory for local file storage
.
*/
private
final
File
storageDir
;
private
final
File
storageDir
;
/** Blob store for distributed file storage, e.g. in HA */
/** Blob store for distributed file storage, e.g. in HA
.
*/
private
final
BlobStore
blobStore
;
private
final
BlobStore
blobStore
;
/** Set of currently running threads */
/** Set of currently running threads
.
*/
private
final
Set
<
BlobServerConnection
>
activeConnections
=
new
HashSet
<>();
private
final
Set
<
BlobServerConnection
>
activeConnections
=
new
HashSet
<>();
/** The maximum number of concurrent connections */
/** The maximum number of concurrent connections
.
*/
private
final
int
maxConnections
;
private
final
int
maxConnections
;
/** Lock guarding concurrent file accesses */
/** Lock guarding concurrent file accesses
.
*/
private
final
ReadWriteLock
readWriteLock
;
private
final
ReadWriteLock
readWriteLock
;
/**
/**
...
@@ -201,8 +201,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -201,8 +201,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
}
}
});
});
if
(
socketAttempt
==
null
)
{
if
(
socketAttempt
==
null
)
{
throw
new
IOException
(
"Unable to allocate socket for blob server in specified port range: "
+
serverPortRange
);
throw
new
IOException
(
"Unable to allocate socket for blob server in specified port range: "
+
serverPortRange
);
}
else
{
}
else
{
SSLUtils
.
setSSLVerAndCipherSuites
(
socketAttempt
,
config
);
SSLUtils
.
setSSLVerAndCipherSuites
(
socketAttempt
,
config
);
this
.
serverSocket
=
socketAttempt
;
this
.
serverSocket
=
socketAttempt
;
...
@@ -254,7 +254,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -254,7 +254,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
}
}
/**
/**
* Returns the lock used to guard file accesses
* Returns the lock used to guard file accesses
.
*/
*/
ReadWriteLock
getReadWriteLock
()
{
ReadWriteLock
getReadWriteLock
()
{
return
readWriteLock
;
return
readWriteLock
;
...
@@ -360,7 +360,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -360,7 +360,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
}
}
}
}
if
(
LOG
.
isInfoEnabled
())
{
if
(
LOG
.
isInfoEnabled
())
{
LOG
.
info
(
"Stopped BLOB server at {}:{}"
,
serverSocket
.
getInetAddress
().
getHostAddress
(),
getPort
());
LOG
.
info
(
"Stopped BLOB server at {}:{}"
,
serverSocket
.
getInetAddress
().
getHostAddress
(),
getPort
());
}
}
...
@@ -375,8 +375,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -375,8 +375,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/**
/**
* Retrieves the local path of a (job-unrelated) file associated with a job and a blob key.
* Retrieves the local path of a (job-unrelated) file associated with a job and a blob key.
*
<p>
*
* The blob server looks the blob key up in its local storage. If the file exists, it is
*
<p>
The blob server looks the blob key up in its local storage. If the file exists, it is
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* or a {@link FileNotFoundException} is thrown.
* or a {@link FileNotFoundException} is thrown.
*
*
...
@@ -395,8 +395,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -395,8 +395,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/**
/**
* Retrieves the local path of a file associated with a job and a blob key.
* Retrieves the local path of a file associated with a job and a blob key.
*
<p>
*
* The blob server looks the blob key up in its local storage. If the file exists, it is
*
<p>
The blob server looks the blob key up in its local storage. If the file exists, it is
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* or a {@link FileNotFoundException} is thrown.
* or a {@link FileNotFoundException} is thrown.
*
*
...
@@ -419,8 +419,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -419,8 +419,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/**
/**
* Returns the path to a local copy of the file associated with the provided job ID and blob
* Returns the path to a local copy of the file associated with the provided job ID and blob
* key.
* key.
*
<p>
*
* We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
*
<p>
We will first attempt to serve the BLOB from the local storage. If the BLOB is not in
* there, we will try to download it from the HA store.
* there, we will try to download it from the HA store.
*
*
* @param jobId
* @param jobId
...
@@ -443,8 +443,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -443,8 +443,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/**
/**
* Retrieves the local path of a file associated with a job and a blob key.
* Retrieves the local path of a file associated with a job and a blob key.
*
<p>
*
* The blob server looks the blob key up in its local storage. If the file exists, it is
*
<p>
The blob server looks the blob key up in its local storage. If the file exists, it is
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* or a {@link FileNotFoundException} is thrown.
* or a {@link FileNotFoundException} is thrown.
*
*
...
@@ -474,12 +474,12 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
...
@@ -474,12 +474,12 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma
/**
/**
* Helper to retrieve the local path of a file associated with a job and a blob key.
* Helper to retrieve the local path of a file associated with a job and a blob key.
*
<p>
*
* The blob server looks the blob key up in its local storage. If the file exists, it is
*
<p>
The blob server looks the blob key up in its local storage. If the file exists, it is
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* returned. If the file does not exist, it is retrieved from the HA blob store (if available)
* or a {@link FileNotFoundException} is thrown.
* or a {@link FileNotFoundException} is thrown.
*
<p>
*
* <strong>Assumes the read lock has already been acquired.</strong>
* <
p><
strong>Assumes the read lock has already been acquired.</strong>
*
*
* @param jobId
* @param jobId
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
浏览文件 @
50301254
...
@@ -15,8 +15,13 @@
...
@@ -15,8 +15,13 @@
* See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/
*/
package
org.apache.flink.runtime.blob
;
package
org.apache.flink.runtime.blob
;
/**
* Defines constants for the protocol between the BLOB {@link BlobServer server} and the
* {@link AbstractBlobCache caches}.
*/
public
class
BlobServerProtocol
{
public
class
BlobServerProtocol
{
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
浏览文件 @
50301254
...
@@ -150,8 +150,8 @@ public class BlobUtils {
...
@@ -150,8 +150,8 @@ public class BlobUtils {
File
storageDir
;
File
storageDir
;
// NOTE: although we will be using UUIDs, there may be collisions
// NOTE: although we will be using UUIDs, there may be collisions
final
int
MAX_ATTEMPTS
=
10
;
int
maxAttempts
=
10
;
for
(
int
attempt
=
0
;
attempt
<
MAX_ATTEMPTS
;
attempt
++)
{
for
(
int
attempt
=
0
;
attempt
<
maxAttempts
;
attempt
++)
{
storageDir
=
new
File
(
baseDir
,
String
.
format
(
storageDir
=
new
File
(
baseDir
,
String
.
format
(
"blobStore-%s"
,
UUID
.
randomUUID
().
toString
()));
"blobStore-%s"
,
UUID
.
randomUUID
().
toString
()));
...
@@ -251,8 +251,8 @@ public class BlobUtils {
...
@@ -251,8 +251,8 @@ public class BlobUtils {
/**
/**
* Returns the path for the given blob key.
* Returns the path for the given blob key.
*
<p>
*
* The returned path can be used with the (local or HA) BLOB store file system back-end for
*
<p>
The returned path can be used with the (local or HA) BLOB store file system back-end for
* recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
* recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
* BlobKey)}.
* BlobKey)}.
*
*
...
@@ -403,12 +403,12 @@ public class BlobUtils {
...
@@ -403,12 +403,12 @@ public class BlobUtils {
}
}
}
}
static
void
closeSilently
(
Socket
socket
,
Logger
LOG
)
{
static
void
closeSilently
(
Socket
socket
,
Logger
log
)
{
if
(
socket
!=
null
)
{
if
(
socket
!=
null
)
{
try
{
try
{
socket
.
close
();
socket
.
close
();
}
catch
(
Throwable
t
)
{
}
catch
(
Throwable
t
)
{
LOG
.
debug
(
"Exception while closing BLOB server connection socket."
,
t
);
log
.
debug
(
"Exception while closing BLOB server connection socket."
,
t
);
}
}
}
}
}
}
...
...
flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
浏览文件 @
50301254
...
@@ -46,10 +46,10 @@ public class FileSystemBlobStore implements BlobStoreService {
...
@@ -46,10 +46,10 @@ public class FileSystemBlobStore implements BlobStoreService {
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
FileSystemBlobStore
.
class
);
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
FileSystemBlobStore
.
class
);
/** The file system in which blobs are stored */
/** The file system in which blobs are stored
.
*/
private
final
FileSystem
fileSystem
;
private
final
FileSystem
fileSystem
;
/** The base path of the blob store */
/** The base path of the blob store
.
*/
private
final
String
basePath
;
private
final
String
basePath
;
public
FileSystemBlobStore
(
FileSystem
fileSystem
,
String
storagePath
)
throws
IOException
{
public
FileSystemBlobStore
(
FileSystem
fileSystem
,
String
storagePath
)
throws
IOException
{
...
@@ -148,7 +148,7 @@ public class FileSystemBlobStore implements BlobStoreService {
...
@@ -148,7 +148,7 @@ public class FileSystemBlobStore implements BlobStoreService {
private
boolean
delete
(
String
blobPath
)
{
private
boolean
delete
(
String
blobPath
)
{
try
{
try
{
LOG
.
debug
(
"Deleting {}."
,
blobPath
);
LOG
.
debug
(
"Deleting {}."
,
blobPath
);
Path
path
=
new
Path
(
blobPath
);
Path
path
=
new
Path
(
blobPath
);
boolean
result
=
fileSystem
.
delete
(
path
,
true
);
boolean
result
=
fileSystem
.
delete
(
path
,
true
);
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
浏览文件 @
50301254
...
@@ -78,7 +78,7 @@ import static org.mockito.Mockito.mock;
...
@@ -78,7 +78,7 @@ import static org.mockito.Mockito.mock;
/**
/**
* Tests for GET-specific parts of the {@link BlobCacheService}.
* Tests for GET-specific parts of the {@link BlobCacheService}.
*
*
* This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
*
<p>
This includes access of transient BLOBs from the {@link PermanentBlobCache}, permanent BLOBS from
* the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures
* the {@link TransientBlobCache}, and how failing GET requests behave in the presence of failures
* when used with a {@link BlobCacheService}.
* when used with a {@link BlobCacheService}.
*
*
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
浏览文件 @
50301254
...
@@ -36,10 +36,10 @@ import java.io.IOException;
...
@@ -36,10 +36,10 @@ import java.io.IOException;
public
class
BlobClientSslTest
extends
BlobClientTest
{
public
class
BlobClientSslTest
extends
BlobClientTest
{
/** The instance of the SSL BLOB server used during the tests. */
/** The instance of the SSL BLOB server used during the tests. */
private
static
BlobServer
BLOB_SSL_SERVER
;
private
static
BlobServer
blobSslServer
;
/** Instance of a non-SSL BLOB server with SSL-enabled security options. */
/** Instance of a non-SSL BLOB server with SSL-enabled security options. */
private
static
BlobServer
BLOB_NON_SSL_SERVER
;
private
static
BlobServer
blobNonSslServer
;
/** The SSL blob service client configuration. */
/** The SSL blob service client configuration. */
private
static
Configuration
sslClientConfig
;
private
static
Configuration
sslClientConfig
;
...
@@ -62,8 +62,8 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -62,8 +62,8 @@ public class BlobClientSslTest extends BlobClientTest {
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE
,
"src/test/resources/local127.keystore"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE
,
"src/test/resources/local127.keystore"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEY_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEY_PASSWORD
,
"password"
);
BLOB_SSL_SERVER
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
blobSslServer
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
BLOB_SSL_SERVER
.
start
();
blobSslServer
.
start
();
sslClientConfig
=
new
Configuration
();
sslClientConfig
=
new
Configuration
();
sslClientConfig
.
setBoolean
(
SecurityOptions
.
SSL_ENABLED
,
true
);
sslClientConfig
.
setBoolean
(
SecurityOptions
.
SSL_ENABLED
,
true
);
...
@@ -81,8 +81,8 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -81,8 +81,8 @@ public class BlobClientSslTest extends BlobClientTest {
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE
,
"src/test/resources/local127.keystore"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE
,
"src/test/resources/local127.keystore"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEYSTORE_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEY_PASSWORD
,
"password"
);
config
.
setString
(
SecurityOptions
.
SSL_KEY_PASSWORD
,
"password"
);
BLOB_NON_SSL_SERVER
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
blobNonSslServer
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
BLOB_NON_SSL_SERVER
.
start
();
blobNonSslServer
.
start
();
nonSslClientConfig
=
new
Configuration
();
nonSslClientConfig
=
new
Configuration
();
nonSslClientConfig
.
setBoolean
(
SecurityOptions
.
SSL_ENABLED
,
true
);
nonSslClientConfig
.
setBoolean
(
SecurityOptions
.
SSL_ENABLED
,
true
);
...
@@ -96,11 +96,11 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -96,11 +96,11 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@AfterClass
@AfterClass
public
static
void
stopServers
()
throws
IOException
{
public
static
void
stopServers
()
throws
IOException
{
if
(
BLOB_SSL_SERVER
!=
null
)
{
if
(
blobSslServer
!=
null
)
{
BLOB_SSL_SERVER
.
close
();
blobSslServer
.
close
();
}
}
if
(
BLOB_NON_SSL_SERVER
!=
null
)
{
if
(
blobNonSslServer
!=
null
)
{
BLOB_NON_SSL_SERVER
.
close
();
blobNonSslServer
.
close
();
}
}
}
}
...
@@ -109,7 +109,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -109,7 +109,7 @@ public class BlobClientSslTest extends BlobClientTest {
}
}
protected
BlobServer
getBlobServer
()
{
protected
BlobServer
getBlobServer
()
{
return
BLOB_SSL_SERVER
;
return
blobSslServer
;
}
}
/**
/**
...
@@ -117,7 +117,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -117,7 +117,7 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@Test
@Test
public
void
testUploadJarFilesHelper
()
throws
Exception
{
public
void
testUploadJarFilesHelper
()
throws
Exception
{
uploadJarFile
(
BLOB_SSL_SERVER
,
sslClientConfig
);
uploadJarFile
(
blobSslServer
,
sslClientConfig
);
}
}
/**
/**
...
@@ -126,7 +126,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -126,7 +126,7 @@ public class BlobClientSslTest extends BlobClientTest {
@Test
(
expected
=
IOException
.
class
)
@Test
(
expected
=
IOException
.
class
)
public
void
testSSLClientFailure
()
throws
Exception
{
public
void
testSSLClientFailure
()
throws
Exception
{
// SSL client connected to non-ssl server
// SSL client connected to non-ssl server
uploadJarFile
(
BLOB_SERVER
,
sslClientConfig
);
uploadJarFile
(
blobServer
,
sslClientConfig
);
}
}
/**
/**
...
@@ -135,7 +135,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -135,7 +135,7 @@ public class BlobClientSslTest extends BlobClientTest {
@Test
(
expected
=
IOException
.
class
)
@Test
(
expected
=
IOException
.
class
)
public
void
testSSLClientFailure2
()
throws
Exception
{
public
void
testSSLClientFailure2
()
throws
Exception
{
// SSL client connected to non-ssl server
// SSL client connected to non-ssl server
uploadJarFile
(
BLOB_NON_SSL_SERVER
,
sslClientConfig
);
uploadJarFile
(
blobNonSslServer
,
sslClientConfig
);
}
}
/**
/**
...
@@ -144,7 +144,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -144,7 +144,7 @@ public class BlobClientSslTest extends BlobClientTest {
@Test
(
expected
=
IOException
.
class
)
@Test
(
expected
=
IOException
.
class
)
public
void
testSSLServerFailure
()
throws
Exception
{
public
void
testSSLServerFailure
()
throws
Exception
{
// Non-SSL client connected to ssl server
// Non-SSL client connected to ssl server
uploadJarFile
(
BLOB_SSL_SERVER
,
clientConfig
);
uploadJarFile
(
blobSslServer
,
clientConfig
);
}
}
/**
/**
...
@@ -153,7 +153,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -153,7 +153,7 @@ public class BlobClientSslTest extends BlobClientTest {
@Test
(
expected
=
IOException
.
class
)
@Test
(
expected
=
IOException
.
class
)
public
void
testSSLServerFailure2
()
throws
Exception
{
public
void
testSSLServerFailure2
()
throws
Exception
{
// Non-SSL client connected to ssl server
// Non-SSL client connected to ssl server
uploadJarFile
(
BLOB_SSL_SERVER
,
nonSslClientConfig
);
uploadJarFile
(
blobSslServer
,
nonSslClientConfig
);
}
}
/**
/**
...
@@ -161,7 +161,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -161,7 +161,7 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@Test
@Test
public
void
testNonSSLConnection
()
throws
Exception
{
public
void
testNonSSLConnection
()
throws
Exception
{
uploadJarFile
(
BLOB_SERVER
,
clientConfig
);
uploadJarFile
(
blobServer
,
clientConfig
);
}
}
/**
/**
...
@@ -169,7 +169,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -169,7 +169,7 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@Test
@Test
public
void
testNonSSLConnection2
()
throws
Exception
{
public
void
testNonSSLConnection2
()
throws
Exception
{
uploadJarFile
(
BLOB_SERVER
,
nonSslClientConfig
);
uploadJarFile
(
blobServer
,
nonSslClientConfig
);
}
}
/**
/**
...
@@ -177,7 +177,7 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -177,7 +177,7 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@Test
@Test
public
void
testNonSSLConnection3
()
throws
Exception
{
public
void
testNonSSLConnection3
()
throws
Exception
{
uploadJarFile
(
BLOB_NON_SSL_SERVER
,
clientConfig
);
uploadJarFile
(
blobNonSslServer
,
clientConfig
);
}
}
/**
/**
...
@@ -185,6 +185,6 @@ public class BlobClientSslTest extends BlobClientTest {
...
@@ -185,6 +185,6 @@ public class BlobClientSslTest extends BlobClientTest {
*/
*/
@Test
@Test
public
void
testNonSSLConnection4
()
throws
Exception
{
public
void
testNonSSLConnection4
()
throws
Exception
{
uploadJarFile
(
BLOB_NON_SSL_SERVER
,
nonSslClientConfig
);
uploadJarFile
(
blobNonSslServer
,
nonSslClientConfig
);
}
}
}
}
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
浏览文件 @
50301254
...
@@ -62,7 +62,7 @@ public class BlobClientTest extends TestLogger {
...
@@ -62,7 +62,7 @@ public class BlobClientTest extends TestLogger {
private
static
final
int
TEST_BUFFER_SIZE
=
17
*
1000
;
private
static
final
int
TEST_BUFFER_SIZE
=
17
*
1000
;
/** The instance of the (non-ssl) BLOB server used during the tests. */
/** The instance of the (non-ssl) BLOB server used during the tests. */
static
BlobServer
BLOB_SERVER
;
static
BlobServer
blobServer
;
/** The blob service (non-ssl) client configuration. */
/** The blob service (non-ssl) client configuration. */
static
Configuration
clientConfig
;
static
Configuration
clientConfig
;
...
@@ -79,8 +79,8 @@ public class BlobClientTest extends TestLogger {
...
@@ -79,8 +79,8 @@ public class BlobClientTest extends TestLogger {
config
.
setString
(
BlobServerOptions
.
STORAGE_DIRECTORY
,
config
.
setString
(
BlobServerOptions
.
STORAGE_DIRECTORY
,
temporaryFolder
.
newFolder
().
getAbsolutePath
());
temporaryFolder
.
newFolder
().
getAbsolutePath
());
BLOB_SERVER
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
blobServer
=
new
BlobServer
(
config
,
new
VoidBlobStore
());
BLOB_SERVER
.
start
();
blobServer
.
start
();
clientConfig
=
new
Configuration
();
clientConfig
=
new
Configuration
();
}
}
...
@@ -90,8 +90,8 @@ public class BlobClientTest extends TestLogger {
...
@@ -90,8 +90,8 @@ public class BlobClientTest extends TestLogger {
*/
*/
@AfterClass
@AfterClass
public
static
void
stopServer
()
throws
IOException
{
public
static
void
stopServer
()
throws
IOException
{
if
(
BLOB_SERVER
!=
null
)
{
if
(
blobServer
!=
null
)
{
BLOB_SERVER
.
close
();
blobServer
.
close
();
}
}
}
}
...
@@ -319,7 +319,7 @@ public class BlobClientTest extends TestLogger {
...
@@ -319,7 +319,7 @@ public class BlobClientTest extends TestLogger {
}
}
protected
BlobServer
getBlobServer
()
{
protected
BlobServer
getBlobServer
()
{
return
BLOB_SERVER
;
return
blobServer
;
}
}
@Test
@Test
...
...
flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
浏览文件 @
50301254
...
@@ -78,7 +78,7 @@ public class TestingFailingBlobServer extends BlobServer {
...
@@ -78,7 +78,7 @@ public class TestingFailingBlobServer extends BlobServer {
if
(
socket
!=
null
)
{
if
(
socket
!=
null
)
{
try
{
try
{
socket
.
close
();
socket
.
close
();
}
catch
(
Throwable
ignored
)
{}
}
catch
(
Throwable
ignored
)
{}
}
}
}
}
}
}
...
...
tools/maven/suppressions-runtime.xml
浏览文件 @
50301254
...
@@ -23,13 +23,6 @@ under the License.
...
@@ -23,13 +23,6 @@ under the License.
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppressions>
<suppress
files=
"(.*)runtime[/\\]blob[/\\](.*)"
checks=
"NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"
/>
<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
<suppress
files=
"(.*)test[/\\](.*)runtime[/\\]blob[/\\](.*)"
checks=
"AvoidStarImport|UnusedImport"
/>
<suppress
<suppress
files=
"(.*)runtime[/\\]checkpoint[/\\](.*)"
files=
"(.*)runtime[/\\]checkpoint[/\\](.*)"
checks=
"NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"
/>
checks=
"NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"
/>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录