Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Forever310
druid
提交
19494eeb
D
druid
项目概览
Forever310
/
druid
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
druid
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
19494eeb
编写于
7月 02, 2014
作者:
F
fjy
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #630 from metamx/port-finder
improve port finding strategy for task runner
上级
c6466483
518ab473
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
132 addition
and
3 deletion
+132
-3
indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
...in/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+4
-3
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
.../src/main/java/io/druid/indexing/overlord/PortFinder.java
+94
-0
indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java
.../test/java/io/druid/indexing/overlord/PortFinderTest.java
+34
-0
未找到文件。
indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
浏览文件 @
19494eeb
...
@@ -79,6 +79,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
...
@@ -79,6 +79,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private
final
DruidNode
node
;
private
final
DruidNode
node
;
private
final
ListeningExecutorService
exec
;
private
final
ListeningExecutorService
exec
;
private
final
ObjectMapper
jsonMapper
;
private
final
ObjectMapper
jsonMapper
;
private
final
PortFinder
portFinder
;
private
final
Map
<
String
,
ForkingTaskRunnerWorkItem
>
tasks
=
Maps
.
newHashMap
();
private
final
Map
<
String
,
ForkingTaskRunnerWorkItem
>
tasks
=
Maps
.
newHashMap
();
...
@@ -97,6 +98,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
...
@@ -97,6 +98,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
this
.
taskLogPusher
=
taskLogPusher
;
this
.
taskLogPusher
=
taskLogPusher
;
this
.
jsonMapper
=
jsonMapper
;
this
.
jsonMapper
=
jsonMapper
;
this
.
node
=
node
;
this
.
node
=
node
;
this
.
portFinder
=
new
PortFinder
(
config
.
getStartPort
());
this
.
exec
=
MoreExecutors
.
listeningDecorator
(
Executors
.
newFixedThreadPool
(
workerConfig
.
getCapacity
()));
this
.
exec
=
MoreExecutors
.
listeningDecorator
(
Executors
.
newFixedThreadPool
(
workerConfig
.
getCapacity
()));
}
}
...
@@ -121,7 +123,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
...
@@ -121,7 +123,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final
File
attemptDir
=
new
File
(
taskDir
,
attemptUUID
);
final
File
attemptDir
=
new
File
(
taskDir
,
attemptUUID
);
final
ProcessHolder
processHolder
;
final
ProcessHolder
processHolder
;
final
int
childPort
=
portFinder
.
findUnusedPort
();
try
{
try
{
final
Closer
closer
=
Closer
.
create
();
final
Closer
closer
=
Closer
.
create
();
try
{
try
{
...
@@ -154,7 +156,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
...
@@ -154,7 +156,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
final
List
<
String
>
command
=
Lists
.
newArrayList
();
final
List
<
String
>
command
=
Lists
.
newArrayList
();
final
int
childPort
=
findUnusedPort
();
final
String
childHost
=
String
.
format
(
"%s:%d"
,
node
.
getHostNoPort
(),
childPort
);
final
String
childHost
=
String
.
format
(
"%s:%d"
,
node
.
getHostNoPort
(),
childPort
);
command
.
add
(
config
.
getJavaCommand
());
command
.
add
(
config
.
getJavaCommand
());
...
@@ -258,7 +259,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
...
@@ -258,7 +259,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
taskWorkItem
.
processHolder
.
process
.
destroy
();
taskWorkItem
.
processHolder
.
process
.
destroy
();
}
}
}
}
portFinder
.
markPortUnused
(
childPort
);
log
.
info
(
"Removing temporary directory: %s"
,
attemptDir
);
log
.
info
(
"Removing temporary directory: %s"
,
attemptDir
);
FileUtils
.
deleteDirectory
(
attemptDir
);
FileUtils
.
deleteDirectory
(
attemptDir
);
}
}
...
...
indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java
0 → 100644
浏览文件 @
19494eeb
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.indexing.overlord
;
import
com.google.common.collect.Sets
;
import
com.metamx.common.ISE
;
import
java.io.IOException
;
import
java.net.BindException
;
import
java.net.ServerSocket
;
import
java.util.Set
;
public
class
PortFinder
{
private
final
Set
<
Integer
>
usedPorts
=
Sets
.
newHashSet
();
private
final
int
startPort
;
public
PortFinder
(
int
startPort
)
{
this
.
startPort
=
startPort
;
}
private
static
boolean
canBind
(
int
portNum
)
{
ServerSocket
ss
=
null
;
boolean
isFree
=
false
;
try
{
ss
=
new
ServerSocket
(
portNum
);
isFree
=
true
;
}
catch
(
BindException
be
)
{
isFree
=
false
;
// port in use,
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
);
}
finally
{
if
(
ss
!=
null
)
{
while
(!
ss
.
isClosed
())
{
try
{
ss
.
close
();
}
catch
(
IOException
e
)
{
// ignore
}
}
}
}
return
isFree
;
}
public
synchronized
int
findUnusedPort
()
{
int
port
=
chooseNext
(
startPort
);
while
(!
canBind
(
port
))
{
port
=
chooseNext
(
port
+
1
);
}
usedPorts
.
add
(
port
);
return
port
;
}
public
synchronized
void
markPortUnused
(
int
port
)
{
usedPorts
.
remove
(
port
);
}
private
int
chooseNext
(
int
start
)
{
for
(
int
i
=
start
;
i
<
Integer
.
MAX_VALUE
;
i
++)
{
if
(!
usedPorts
.
contains
(
i
))
{
return
i
;
}
}
throw
new
ISE
(
"All ports are Used.."
);
}
}
indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java
0 → 100644
浏览文件 @
19494eeb
package
io.druid.indexing.overlord
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.io.IOException
;
import
java.net.ServerSocket
;
public
class
PortFinderTest
{
private
final
PortFinder
finder
=
new
PortFinder
(
1200
);
@Test
public
void
testUsedPort
()
throws
IOException
{
final
int
port1
=
finder
.
findUnusedPort
();
// verify that the port is free
ServerSocket
socket1
=
new
ServerSocket
(
port1
);
finder
.
markPortUnused
(
port1
);
final
int
port2
=
finder
.
findUnusedPort
();
Assert
.
assertNotEquals
(
"Used port is not reallocated"
,
port1
,
port2
);
// verify that port2 is free
ServerSocket
socket2
=
new
ServerSocket
(
port2
);
socket1
.
close
();
// Now port1 should get recycled
Assert
.
assertEquals
(
port1
,
finder
.
findUnusedPort
());
socket2
.
close
();
finder
.
markPortUnused
(
port1
);
finder
.
markPortUnused
(
port2
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录