Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
DolphinScheduler
提交
7aeac758
DolphinScheduler
项目概览
apache
/
DolphinScheduler
上一次同步 1 年多
通知
707
Star
9572
Fork
3514
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
7aeac758
编写于
1月 21, 2022
作者:
D
Devosend
提交者:
GitHub
1月 21, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[Feature-7348] [Python] Add workflow as code task type mr (#8140)
上级
df26be89
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
160 addition
and
0 deletion
+160
-0
dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
...on/pydolphinscheduler/examples/task_map_reduce_example.py
+32
-0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
...on/pydolphinscheduler/src/pydolphinscheduler/constants.py
+1
-0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
...lphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
+52
-0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
...-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
+75
-0
未找到文件。
dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py
0 → 100644
浏览文件 @
7aeac758
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""A example workflow for task mr."""
from
pydolphinscheduler.core.engine
import
ProgramType
from
pydolphinscheduler.core.process_definition
import
ProcessDefinition
from
pydolphinscheduler.tasks.map_reduce
import
MR
with
ProcessDefinition
(
name
=
"task_map_reduce_example"
,
tenant
=
"tenant_exists"
)
as
pd
:
task
=
MR
(
name
=
"task_mr"
,
main_class
=
"wordcount"
,
main_package
=
"hadoop-mapreduce-examples-3.3.1.jar"
,
program_type
=
ProgramType
.
JAVA
,
main_args
=
"/dolphinscheduler/tenant_exists/resources/file.txt /output/ds"
,
)
pd
.
run
()
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
浏览文件 @
7aeac758
...
...
@@ -79,6 +79,7 @@ class TaskType(str):
SWITCH
=
"SWITCH"
FLINK
=
"FLINK"
SPARK
=
"SPARK"
MR
=
"MR"
class
DefaultTaskCodeNum
(
str
):
...
...
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
0 → 100644
浏览文件 @
7aeac758
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task MR."""
from
typing
import
Optional
from
pydolphinscheduler.constants
import
TaskType
from
pydolphinscheduler.core.engine
import
Engine
,
ProgramType
class
MR
(
Engine
):
"""Task mr object, declare behavior for mr task to dolphinscheduler."""
_task_custom_attr
=
{
"app_name"
,
"main_args"
,
"others"
,
}
def
__init__
(
self
,
name
:
str
,
main_class
:
str
,
main_package
:
str
,
program_type
:
Optional
[
ProgramType
]
=
ProgramType
.
SCALA
,
app_name
:
Optional
[
str
]
=
None
,
main_args
:
Optional
[
str
]
=
None
,
others
:
Optional
[
str
]
=
None
,
*
args
,
**
kwargs
):
super
().
__init__
(
name
,
TaskType
.
MR
,
main_class
,
main_package
,
program_type
,
*
args
,
**
kwargs
)
self
.
app_name
=
app_name
self
.
main_args
=
main_args
self
.
others
=
others
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
0 → 100644
浏览文件 @
7aeac758
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task MR."""
from
unittest.mock
import
patch
from
pydolphinscheduler.tasks.map_reduce
import
MR
,
ProgramType
@
patch
(
"pydolphinscheduler.core.engine.Engine.get_resource_info"
,
return_value
=
({
"id"
:
1
,
"name"
:
"test"
}),
)
def
test_mr_get_define
(
mock_resource
):
"""Test task mr function get_define."""
code
=
123
version
=
1
name
=
"test_mr_get_define"
main_class
=
"org.apache.mr.test_main_class"
main_package
=
"test_main_package"
program_type
=
ProgramType
.
JAVA
main_args
=
"/dolphinscheduler/resources/file.txt /output/ds"
expect
=
{
"code"
:
code
,
"name"
:
name
,
"version"
:
1
,
"description"
:
None
,
"delayTime"
:
0
,
"taskType"
:
"MR"
,
"taskParams"
:
{
"mainClass"
:
main_class
,
"mainJar"
:
{
"id"
:
1
,
},
"programType"
:
program_type
,
"appName"
:
None
,
"mainArgs"
:
main_args
,
"others"
:
None
,
"localParams"
:
[],
"resourceList"
:
[],
"dependence"
:
{},
"conditionResult"
:
{
"successNode"
:
[
""
],
"failedNode"
:
[
""
]},
"waitStartTimeout"
:
{},
},
"flag"
:
"YES"
,
"taskPriority"
:
"MEDIUM"
,
"workerGroup"
:
"default"
,
"failRetryTimes"
:
0
,
"failRetryInterval"
:
1
,
"timeoutFlag"
:
"CLOSE"
,
"timeoutNotifyStrategy"
:
None
,
"timeout"
:
0
,
}
with
patch
(
"pydolphinscheduler.core.task.Task.gen_code_and_version"
,
return_value
=
(
code
,
version
),
):
task
=
MR
(
name
,
main_class
,
main_package
,
program_type
,
main_args
=
main_args
)
assert
task
.
get_define
()
==
expect
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录