Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Forever310
druid
提交
64307766
D
druid
项目概览
Forever310
/
druid
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
D
druid
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
64307766
编写于
7月 17, 2014
作者:
F
fjy
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #641 from metamx/cleanup-ingest
some minor cleanups to ingest firehose
上级
ded83557
d8b8826c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
29 addition
and
76 deletion
+29
-76
indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
...ice/src/main/java/io/druid/indexing/common/task/Task.java
+1
-3
indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
...druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+4
-5
indexing-service/src/main/java/io/druid/indexing/firehose/IngestTask.java
.../src/main/java/io/druid/indexing/firehose/IngestTask.java
+0
-55
server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java
...d/segment/realtime/firehose/CombiningFirehoseFactory.java
+14
-1
server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java
...druid/realtime/firehose/CombiningFirehoseFactoryTest.java
+10
-12
未找到文件。
indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
浏览文件 @
64307766
...
...
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import
io.druid.indexing.common.TaskStatus
;
import
io.druid.indexing.common.TaskToolbox
;
import
io.druid.indexing.common.actions.TaskActionClient
;
import
io.druid.indexing.firehose.IngestTask
;
import
io.druid.query.Query
;
import
io.druid.query.QueryRunner
;
...
...
@@ -54,8 +53,7 @@ import io.druid.query.QueryRunner;
@JsonSubTypes
.
Type
(
name
=
"index_realtime"
,
value
=
RealtimeIndexTask
.
class
),
@JsonSubTypes
.
Type
(
name
=
"noop"
,
value
=
NoopTask
.
class
),
@JsonSubTypes
.
Type
(
name
=
"version_converter"
,
value
=
VersionConverterTask
.
class
),
@JsonSubTypes
.
Type
(
name
=
"version_converter_sub"
,
value
=
VersionConverterTask
.
SubTask
.
class
),
@JsonSubTypes
.
Type
(
name
=
"ingest-task"
,
value
=
IngestTask
.
class
)
@JsonSubTypes
.
Type
(
name
=
"version_converter_sub"
,
value
=
VersionConverterTask
.
SubTask
.
class
)
})
public
interface
Task
{
...
...
indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
浏览文件 @
64307766
...
...
@@ -42,12 +42,10 @@ import io.druid.data.input.InputRow;
import
io.druid.data.input.MapBasedInputRow
;
import
io.druid.data.input.impl.InputRowParser
;
import
io.druid.granularity.QueryGranularity
;
import
io.druid.indexing.common.TaskStatus
;
import
io.druid.indexing.common.TaskToolbox
;
import
io.druid.indexing.common.TaskToolboxFactory
;
import
io.druid.indexing.common.actions.SegmentListUsedAction
;
import
io.druid.indexing.common.actions.TaskActionClient
;
import
io.druid.indexing.common.task.AbstractTask
;
import
io.druid.indexing.common.task.NoopTask
;
import
io.druid.query.filter.DimFilter
;
import
io.druid.query.select.EventHolder
;
import
io.druid.segment.Cursor
;
...
...
@@ -139,10 +137,11 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@Override
public
Firehose
connect
(
InputRowParser
inputRowParser
)
throws
IOException
,
ParseException
{
log
.
info
(
"Connecting firehose:
IngestSegmentFirehose[%s,
%s]"
,
dataSource
,
interval
);
log
.
info
(
"Connecting firehose:
dataSource[%s], interval[
%s]"
,
dataSource
,
interval
);
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
// Noop Task is just used to create the toolbox and list segments.
final
TaskToolbox
toolbox
=
injector
.
getInstance
(
TaskToolboxFactory
.
class
).
build
(
new
IngestTask
(
"Ingest-Task-Id"
,
dataSource
)
new
NoopTask
(
"reingest"
,
0
,
0
,
null
,
null
)
);
try
{
...
...
indexing-service/src/main/java/io/druid/indexing/firehose/IngestTask.java
已删除
100644 → 0
浏览文件 @
ded83557
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package
io.druid.indexing.firehose
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
io.druid.indexing.common.TaskStatus
;
import
io.druid.indexing.common.TaskToolbox
;
import
io.druid.indexing.common.actions.TaskActionClient
;
import
io.druid.indexing.common.task.AbstractTask
;
public
class
IngestTask
extends
AbstractTask
{
public
IngestTask
(
@JsonProperty
(
"id"
)
final
String
id
,
@JsonProperty
(
"dataSource"
)
final
String
dataSource
)
{
super
(
id
,
dataSource
);
}
@Override
public
String
getType
()
{
return
"Ingest-Task"
;
}
@Override
public
boolean
isReady
(
TaskActionClient
taskActionClient
)
throws
Exception
{
return
true
;
}
@Override
public
TaskStatus
run
(
TaskToolbox
toolbox
)
throws
Exception
{
return
TaskStatus
.
success
(
getId
());
}
}
\ No newline at end of file
server/src/main/java/io/druid/segment/realtime/firehose/CombiningFirehoseFactory.java
浏览文件 @
64307766
...
...
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
com.google.api.client.repackaged.com.google.common.base.Preconditions
;
import
com.google.api.client.repackaged.com.google.common.base.Throwables
;
import
com.metamx.emitter.EmittingLogger
;
import
io.druid.data.input.Firehose
;
import
io.druid.data.input.FirehoseFactory
;
import
io.druid.data.input.InputRow
;
...
...
@@ -37,6 +38,8 @@ import java.util.List;
*/
public
class
CombiningFirehoseFactory
implements
FirehoseFactory
<
InputRowParser
>
{
private
static
final
EmittingLogger
log
=
new
EmittingLogger
(
CombiningFirehoseFactory
.
class
);
private
final
List
<
FirehoseFactory
>
delegateFactoryList
;
@JsonCreator
...
...
@@ -86,10 +89,20 @@ public class CombiningFirehoseFactory implements FirehoseFactory<InputRowParser>
if
(
currentFirehose
!=
null
)
{
currentFirehose
.
close
();
}
currentFirehose
=
firehoseFactoryIterator
.
next
().
connect
(
parser
);
}
catch
(
IOException
e
)
{
Throwables
.
propagate
(
e
);
if
(
currentFirehose
!=
null
)
{
try
{
currentFirehose
.
close
();
}
catch
(
IOException
e2
)
{
log
.
error
(
e
,
"Unable to close currentFirehose!"
);
throw
Throwables
.
propagate
(
e2
);
}
}
throw
Throwables
.
propagate
(
e
);
}
}
}
...
...
server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java
浏览文件 @
64307766
...
...
@@ -44,16 +44,15 @@ public class CombiningFirehoseFactoryTest
public
void
testCombiningfirehose
()
throws
IOException
{
List
<
InputRow
>
list1
=
Arrays
.
asList
(
makeRow
(
1
,
1
),
makeRow
(
2
,
2
));
List
<
InputRow
>
list2
=
Arrays
.
asList
(
makeRow
(
3
,
3
),
makeRow
(
4
,
4
));
List
<
InputRow
>
list2
=
Arrays
.
asList
(
makeRow
(
3
,
3
),
makeRow
(
4
,
4
)
,
makeRow
(
5
,
5
)
);
FirehoseFactory
combiningFactory
=
new
CombiningFirehoseFactory
(
Arrays
.<
FirehoseFactory
>
asList
(
new
ListFirehoseFactory
(
list1
),
new
ListFirehoseFactory
(
list2
)
new
ListFirehoseFactory
(
list1
),
new
ListFirehoseFactory
(
list2
)
)
);
final
Firehose
firehose
=
combiningFactory
.
connect
(
null
);
for
(
int
i
=
1
;
i
<
5
;
i
++)
{
for
(
int
i
=
1
;
i
<
6
;
i
++)
{
Assert
.
assertTrue
(
firehose
.
hasMore
());
final
InputRow
inputRow
=
firehose
.
nextRow
();
Assert
.
assertEquals
(
i
,
inputRow
.
getTimestampFromEpoch
());
...
...
@@ -84,12 +83,6 @@ public class CombiningFirehoseFactoryTest
return
new
DateTime
(
timestamp
);
}
@Override
public
int
compareTo
(
Row
o
)
{
return
0
;
}
@Override
public
List
<
String
>
getDimension
(
String
dimension
)
{
...
...
@@ -108,6 +101,11 @@ public class CombiningFirehoseFactoryTest
return
null
;
}
@Override
public
int
compareTo
(
Row
o
)
{
return
0
;
}
};
}
...
...
@@ -147,7 +145,7 @@ public class CombiningFirehoseFactoryTest
@Override
public
void
close
()
throws
IOException
{
//
//
Do nothing
}
};
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录