Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
b0fbce71
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b0fbce71
编写于
3月 19, 2014
作者:
M
mingliang
提交者:
Robert Metzger
4月 25, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add test for delete process of tmp file and tested in cluster
上级
8676280d
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
183 addition
and
34 deletion
+183
-34
stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
...e-core/src/main/java/eu/stratosphere/api/common/Plan.java
+6
-3
stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
...va/eu/stratosphere/api/common/cache/DistributedCache.java
+15
-10
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java
...here/api/java/operators/translation/WrappingFunction.java
+6
-0
stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
...in/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+33
-21
stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
...ere/pact/runtime/cache/FileCacheDeleteValidationTest.java
+110
-0
stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
...atosphere/test/distributedCache/DistributedCacheTest.java
+13
-0
未找到文件。
stratosphere-core/src/main/java/eu/stratosphere/api/common/Plan.java
浏览文件 @
b0fbce71
...
...
@@ -13,20 +13,23 @@
package
eu.stratosphere.api.common
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkArgument
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
import
java.util.ArrayList
;
import
java.util.Calendar
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map.Entry
;
import
java.util.Set
;
import
eu.stratosphere.api.common.operators.GenericDataSink
;
import
eu.stratosphere.api.common.operators.Operator
;
import
eu.stratosphere.util.Visitable
;
import
eu.stratosphere.util.Visitor
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkArgument
;
/**
* This class encapsulates a single stratosphere job (an instantiated data flow), together with some parameters.
* Parameters include the name and a default degree of parallelism. The job is referenced by the data sinks,
...
...
stratosphere-core/src/main/java/eu/stratosphere/api/common/cache/DistributedCache.java
浏览文件 @
b0fbce71
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package
eu.stratosphere.api.common.cache
;
import
eu.stratosphere.configuration.ConfigConstants
;
import
eu.stratosphere.configuration.Configuration
;
import
eu.stratosphere.configuration.GlobalConfiguration
;
import
eu.stratosphere.core.fs.Path
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
java.io.File
;
import
java.util.Set
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.FutureTask
;
/**
...
...
@@ -29,8 +38,6 @@ public class DistributedCache {
public
final
static
String
TMP_PREFIX
=
"tmp_"
;
public
final
static
int
DEFAULT_BUFFER_SIZE
=
8192
;
private
Map
<
String
,
FutureTask
<
Path
>>
cacheCopyTasks
=
new
HashMap
<
String
,
FutureTask
<
Path
>>();
public
static
void
addCachedFile
(
String
name
,
String
filePath
,
Configuration
conf
)
{
...
...
@@ -60,10 +67,8 @@ public class DistributedCache {
//The FutureTask.get() method will block until the file is ready.
try
{
tmp
=
cacheCopyTasks
.
get
(
name
).
get
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
catch
(
ExecutionException
e
)
{
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Error while getting file from distributed cache"
,
e
);
}
return
new
File
(
tmp
.
toString
());
}
...
...
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/WrappingFunction.java
浏览文件 @
b0fbce71
...
...
@@ -24,6 +24,7 @@ import eu.stratosphere.api.common.accumulators.Histogram;
import
eu.stratosphere.api.common.accumulators.IntCounter
;
import
eu.stratosphere.api.common.accumulators.LongCounter
;
import
eu.stratosphere.api.common.aggregators.Aggregator
;
import
eu.stratosphere.api.common.cache.DistributedCache
;
import
eu.stratosphere.api.common.functions.AbstractFunction
;
import
eu.stratosphere.api.common.functions.IterationRuntimeContext
;
import
eu.stratosphere.api.common.functions.RuntimeContext
;
...
...
@@ -138,6 +139,11 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr
return
list
;
}
@Override
public
DistributedCache
getDistributedCache
()
{
return
context
.
getDistributedCache
();
}
}
private
static
class
WrappingIterationRuntimeContext
extends
WrappingRuntimeContext
implements
IterationRuntimeContext
{
...
...
stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
浏览文件 @
b0fbce71
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package
eu.stratosphere.pact.runtime.cache
;
import
eu.stratosphere.api.common.cache.DistributedCache
;
...
...
@@ -10,6 +23,8 @@ import eu.stratosphere.core.fs.Path;
import
eu.stratosphere.core.fs.local.LocalFileSystem
;
import
eu.stratosphere.nephele.jobgraph.JobID
;
import
eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory
;
import
eu.stratosphere.nephele.util.IOUtils
;
import
org.apache.commons.lang3.tuple.ImmutablePair
;
import
org.apache.commons.lang3.tuple.Pair
;
...
...
@@ -28,7 +43,7 @@ public class FileCache {
private
LocalFileSystem
lfs
=
new
LocalFileSystem
();
private
Map
<
Pair
<
JobID
,
String
>,
Boolean
>
active
=
new
HashMap
<
Pair
<
JobID
,
String
>,
Boolean
>();
private
Map
<
Pair
<
JobID
,
String
>,
Integer
>
count
=
new
HashMap
<
Pair
<
JobID
,
String
>,
Integer
>();
private
final
ScheduledExecutorService
executorService
=
Executors
.
newScheduledThreadPool
(
10
,
ExecutorThreadFactory
.
INSTANCE
);
...
...
@@ -37,8 +52,13 @@ public class FileCache {
*/
public
FutureTask
<
Path
>
createTmpFile
(
String
name
,
String
filePath
,
JobID
jobID
)
{
synchronized
(
active
)
{
active
.
put
(
new
ImmutablePair
(
jobID
,
name
),
true
);
synchronized
(
count
)
{
Pair
<
JobID
,
String
>
key
=
new
ImmutablePair
(
jobID
,
name
);
if
(
count
.
containsKey
(
key
))
{
count
.
put
(
key
,
count
.
get
(
key
)
+
1
);
}
else
{
count
.
put
(
key
,
1
);
}
}
CopyProcess
cp
=
new
CopyProcess
(
name
,
filePath
,
jobID
);
FutureTask
<
Path
>
copyTask
=
new
FutureTask
<
Path
>(
cp
);
...
...
@@ -50,10 +70,7 @@ public class FileCache {
* Leave a 5 seconds delay to clear the local file.
*/
public
void
deleteTmpFile
(
String
name
,
JobID
jobID
)
{
synchronized
(
active
)
{
active
.
put
(
new
ImmutablePair
(
jobID
,
name
),
false
);
}
DeleteProcess
dp
=
new
DeleteProcess
(
name
,
jobID
);
DeleteProcess
dp
=
new
DeleteProcess
(
name
,
jobID
,
count
.
get
(
new
ImmutablePair
(
jobID
,
name
)));
executorService
.
schedule
(
dp
,
5000L
,
TimeUnit
.
MILLISECONDS
);
}
...
...
@@ -68,7 +85,7 @@ public class FileCache {
try
{
this
.
executorService
.
awaitTermination
(
5000L
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
throw
new
RuntimeException
(
"Error shutting down the file cache"
,
e
);
}
}
}
...
...
@@ -94,17 +111,10 @@ public class FileCache {
Path
distributedPath
=
new
Path
(
filePath
);
FileSystem
fs
=
distributedPath
.
getFileSystem
();
FSDataInputStream
fsInput
=
fs
.
open
(
distributedPath
);
byte
[]
buffer
=
new
byte
[
DistributedCache
.
DEFAULT_BUFFER_SIZE
];
int
num
=
fsInput
.
read
(
buffer
);
while
(
num
!=
-
1
)
{
lfsOutput
.
write
(
buffer
,
0
,
num
);
num
=
fsInput
.
read
(
buffer
);
}
fsInput
.
close
();
lfsOutput
.
close
();
IOUtils
.
copyBytes
(
fsInput
,
lfsOutput
);
}
}
catch
(
IOException
e1
)
{
e1
.
printStackTrace
(
);
throw
new
RuntimeException
(
"Error copying a file from hdfs to the local fs"
,
e1
);
}
return
tmp
;
}
...
...
@@ -115,15 +125,17 @@ public class FileCache {
private
class
DeleteProcess
implements
Runnable
{
private
String
name
;
private
JobID
jobID
;
private
int
oldCount
;
public
DeleteProcess
(
String
name
,
JobID
jobID
)
{
public
DeleteProcess
(
String
name
,
JobID
jobID
,
int
c
)
{
this
.
name
=
name
;
this
.
jobID
=
jobID
;
this
.
oldCount
=
c
;
}
public
void
run
()
{
synchronized
(
active
)
{
if
(
active
.
get
(
new
ImmutablePair
(
jobID
,
name
))
)
{
synchronized
(
count
)
{
if
(
count
.
get
(
new
ImmutablePair
(
jobID
,
name
))
!=
oldCount
)
{
return
;
}
}
...
...
@@ -133,7 +145,7 @@ public class FileCache {
lfs
.
delete
(
tmp
,
true
);
}
}
catch
(
IOException
e1
)
{
e1
.
printStackTrace
(
);
throw
new
RuntimeException
(
"Error deleting the file"
,
e1
);
}
}
}
...
...
stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/cache/FileCacheDeleteValidationTest.java
0 → 100644
浏览文件 @
b0fbce71
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package
eu.stratosphere.pact.runtime.cache
;
import
com.google.common.base.Charsets
;
import
com.google.common.io.Files
;
import
eu.stratosphere.core.fs.Path
;
import
eu.stratosphere.core.fs.local.LocalFileSystem
;
import
eu.stratosphere.nephele.jobgraph.JobID
;
import
junit.framework.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.After
;
import
java.io.File
;
import
java.io.IOException
;
/**
* Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds.
*/
public
class
FileCacheDeleteValidationTest
{
FileCache
fileCache
=
new
FileCache
();
LocalFileSystem
lfs
=
new
LocalFileSystem
();
String
testFileContent
=
"Goethe - Faust: Der Tragoedie erster Teil\n"
+
"Prolog im Himmel.\n"
+
"Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n"
+
"Erzengel treten vor.\n"
+
"RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
+
"Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
+
"gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
+
"hohen Werke Sind herrlich wie am ersten Tag.\n"
+
"GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
+
"Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
+
"schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
+
"Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+
"MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
+
"aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
+
"Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
+
"deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\n"
;
@Before
public
void
createTmpCacheFile
()
{
File
f
=
new
File
(
System
.
getProperty
(
"java.io.tmpdir"
),
"cacheFile"
);
try
{
Files
.
write
(
testFileContent
,
f
,
Charsets
.
UTF_8
);
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
"Error initializing the test"
,
e
);
}
}
@Test
public
void
testFileReuseForNextTask
()
{
JobID
jobID
=
new
JobID
();
String
filePath
=
"file://"
+
new
Path
(
System
.
getProperty
(
"java.io.tmpdir"
),
"cacheFile"
).
toString
();
fileCache
.
createTmpFile
(
"test_file"
,
filePath
,
jobID
);
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
fileCache
.
deleteTmpFile
(
"test_file"
,
jobID
);
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
//new task comes after 1 second
try
{
Assert
.
assertTrue
(
"Local cache file should not be deleted when another task comes in 5 seconds!"
,
lfs
.
exists
(
fileCache
.
getTempDir
(
jobID
,
"test_file"
)));
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
fileCache
.
createTmpFile
(
"test_file"
,
filePath
,
jobID
);
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
fileCache
.
deleteTmpFile
(
"test_file"
,
jobID
);
try
{
Thread
.
sleep
(
7000
);
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
//no task comes in 7 seconds
try
{
Assert
.
assertTrue
(
"Local cache file should be deleted when no task comes in 5 seconds!"
,
!
lfs
.
exists
(
fileCache
.
getTempDir
(
jobID
,
"test_file"
)));
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
"Interrupted error"
,
e
);
}
}
@After
public
void
shutdown
()
{
fileCache
.
shutdown
();
}
}
stratosphere-tests/src/test/java/eu/stratosphere/test/distributedCache/DistributedCacheTest.java
浏览文件 @
b0fbce71
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package
eu.stratosphere.test.distributedCache
;
import
eu.stratosphere.api.common.Plan
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录