未验证 提交 dd6ed36f 编写于 作者: J Jiajie Zhong 提交者: GitHub

Add Python API implementation of workflows-as-code (#6269)

* Init DS python SDK pydolphinscheduler: python code definition

* Doc first

* Add quick start and developer doc

* Java documentation change

* Add LICENSE-py4j.txt

* Add py4j to release-docs/LICENSE

* Move dependency version to parent pom

* Remove outdated code

* Add tenant parameter to tutorial
上级 00eea95d
......@@ -46,3 +46,8 @@ dolphinscheduler-server/src/main/resources/logback.xml
dolphinscheduler-ui/dist
dolphinscheduler-ui/node
docker/build/apache-dolphinscheduler*
# pydolphinscheduler
__pycache__/
build/
*egg-info/
......@@ -46,6 +46,15 @@ public interface ProjectService {
*/
Map<String, Object> queryByCode(User loginUser, long projectCode);
/**
* query project details by name
*
* @param loginUser login user
* @param projectName project name
* @return project detail information
*/
Map<String, Object> queryByName(User loginUser, String projectName);
/**
* check project and authorization
*
......
......@@ -76,4 +76,12 @@ public interface QueueService {
*/
Result<Object> verifyQueue(String queue, String queueName);
/**
* query queue by queueName
*
* @param queueName queue name
* @return queue object for provide queue name
*/
Map<String, Object> queryQueueName(String queueName);
}
......@@ -92,4 +92,20 @@ public interface TenantService {
* @return true if tenant code can user, otherwise return false
*/
Result verifyTenantCode(String tenantCode);
/**
* check if provide tenant code object exists
*
* @param tenantCode tenant code
* @return true if tenant code exists, false if not
*/
boolean checkTenantExists(String tenantCode);
/**
* query tenant by tenant code
*
* @param tenantCode tenant code
* @return tenant list
*/
Map<String, Object> queryByTenantCode(String tenantCode);
}
......@@ -139,6 +139,21 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
return result;
}
@Override
public Map<String, Object> queryByName(User loginUser, String projectName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
if (project != null) {
result.put(Constants.DATA_LIST, project);
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* check project and authorization
*
......
......@@ -262,6 +262,32 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
return result;
}
/**
* query queue by queueName
*
* @param queueName queue name
* @return queue object for provide queue name
*/
@Override
public Map<String, Object> queryQueueName(String queueName) {
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
return result;
}
if (!checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NOT_EXIST, queueName);
return result;
}
List<Queue> queueList = queueMapper.queryQueueName(queueName);
result.put(Constants.DATA_LIST, queueList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check queue exist
* if exists return true,not exists return false
......
......@@ -320,8 +320,25 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
* @param tenantCode tenant code
* @return ture if the tenant code exists, otherwise return false
*/
private boolean checkTenantExists(String tenantCode) {
public boolean checkTenantExists(String tenantCode) {
Boolean existTenant = tenantMapper.existTenant(tenantCode);
return existTenant == Boolean.TRUE;
}
/**
* query tenant by tenant code
*
* @param tenantCode tenant code
* @return tenant detail information
*/
@Override
public Map<String, Object> queryByTenantCode(String tenantCode) {
Map<String, Object> result = new HashMap<>();
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant != null) {
result.put(Constants.DATA_LIST, tenant);
putMsg(result, Status.SUCCESS);
}
return result;
}
}
......@@ -415,6 +415,7 @@ public final class Constants {
public static final String NULL = "NULL";
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static final String THREAD_NAME_GATEWAY_SERVER = "Gateway-Server";
/**
* command parameter keys
......
......@@ -53,4 +53,11 @@ public interface QueueMapper extends BaseMapper<Queue> {
* @return true if exist else return null
*/
Boolean existQueue(@Param("queue") String queue, @Param("queueName") String queueName);
/**
* query queue by queue name
* @param queueName queueName
* @return queue list
*/
List<Queue> queryQueueName(@Param("queueName") String queueName);
}
......@@ -54,4 +54,13 @@
and queue_name =#{queueName}
</if>
</select>
<select id="queryQueueName" resultType="org.apache.dolphinscheduler.dao.entity.Queue">
select
<include refid="baseSql"/>
from t_ds_queue
where 1 = 1
<if test="queueName != null and queueName != ''">
and queue_name =#{queueName}
</if>
</select>
</mapper>
......@@ -436,7 +436,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
threetenbp 1.3.6: https://mvnrepository.com/artifact/org.threeten/threetenbp/1.3.6, BSD 3-clause
xmlenc 0.52: https://mvnrepository.com/artifact/xmlenc/xmlenc/0.52, BSD
hamcrest-core 1.3: https://mvnrepository.com/artifact/org.hamcrest/hamcrest-core/1.3, BSD 2-Clause
py4j 0.10.9: https://mvnrepository.com/artifact/net.sf.py4j/py4j/0.10.9, BSD 2-clause
========================================================================
CDDL licenses
......
Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All
rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- The name of the author may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-python</artifactId>
<name>${project.artifactId}</name>
<packaging>jar</packaging>
<dependencies>
<!-- dolphinscheduler -->
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-api</artifactId>
</dependency>
<!--springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
</dependency>
</dependencies>
</project>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# pydolphinscheduler
pydolphinscheduler is python API for Apache DolphinScheduler, which allow you definition
your workflow by python code, aka workflow-as-codes.
## Quick Start
> **_Notice:_** For now, due to pydolphinscheduler without release to any binary tarball or [PyPI][pypi], you
> have to clone Apache DolphinScheduler code from GitHub to ensure quick start setup
Here we show you how to install and run a simple example of pydolphinscheduler
### Prepare
```shell
# Clone code from github
git clone git@github.com:apache/dolphinscheduler.git
# Install pydolphinscheduler from source
cd dolphinscheduler-python/pydolphinscheduler
pip setup.py install
```
### Start Server And Run Example
Before you run an example, you have to start backend server. You could follow [development setup][dev-setup]
section "DolphinScheduler Standalone Quick Start" to set up developer environment. You have to start backend
and frontend server in this step, which mean that you could view DolphinScheduler UI in your browser with URL
http://localhost:12345/dolphinscheduler
After backend server is being start, all requests from `pydolphinscheduler` would be sends to backend server.
And for now we could run a simple example by:
```shell
cd dolphinscheduler-python/pydolphinscheduler
python example/tutorial.py
```
> **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while running command, you might need to change
> tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists
> in you environment.
After command execute, you could see a new project with single process definition named *tutorial* in the [UI][ui-project].
Until now, we finish quick start by an example of pydolphinscheduler and run it. If you want to inspect or join
pydolphinscheduler develop, you could take a look at [develop](#develop)
## Develop
pydolphinscheduler is python API for Apache DolphinScheduler, it just defines what workflow look like instead of
store or execute it. We here use [py4j][py4j] to dynamically access Java Virtual Machine.
### Setup Develop Environment
We already clone the code in [quick start](#quick-start), so next step we have to open pydolphinscheduler project
in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ IDEA][idea] to open it. And you could
just open directory `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`.
### Brief Concept
Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When
define by code, user usually do not care user, tanant, or queue exists or not. All user care about is create
a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side`
directory, their only check object exists or not, and create them if not exists.
#### Process Definition
pydolphinscheduler workflow object name, process definition is also same name as Java object(maybe would be change to
other word for more simple).
#### Tasks
pydolphinscheduler tasks object, we use tasks to define exact job we want DolphinScheduler do for us. For now,
we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler
and would be implement in the further.
[pypi]: https://pypi.org/
[dev-setup]: https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html
[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list
[py4j]: https://www.py4j.org/index.html
[pycharm]: https://www.jetbrains.com/pycharm
[idea]: https://www.jetbrains.com/idea/
[all-task]: https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## Roadmap
### v0.0.3
Add other features, tasks, parameters in DS, keep code coverage up to 90%
### v0.0.2
Add docs about how to use and develop package, code coverage up to 90%, add CI/CD
for package
### v0.0.1(current)
Setup up POC, for defining DAG with python code, running DAG manually,
releasing to pypi
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
r"""
After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
and workflow DAG graph as below:
--> task_child_one
/ \
task_parent --> --> task_union
\ /
--> task_child_two
"""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")
task_union = Shell(name="task_union", command="echo union")
task_group = [task_child_one, task_child_two]
task_parent.set_downstream(task_group)
task_union << task_group
pd.run()
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
py4j~=0.10.9.2
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# testting
pytest~=6.2.5
# code linting and formatting
flake8-black~=0.2.3
# flake8
# flake8-docstrings
# flake8-black
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
from os.path import dirname, join
from setuptools import find_packages, setup
version = '0.0.1.dev0'
if sys.version_info[0] < 3:
raise Exception("pydolphinscheduler does not support Python 2. Please upgrade to Python 3.")
def read(*names, **kwargs):
return open(
join(dirname(__file__), *names), encoding=kwargs.get("encoding", "utf8")
).read()
setup(
name="pydolphinscheduler",
version=version,
license="Apache License 2.0",
description="Apache DolphinScheduler python SDK",
long_description=read("README.md"),
# Make sure pypi is expecting markdown
long_description_content_type="text/markdown",
author="Apache Software Foundation",
author_email="dev@dolphinscheduler.apache.org",
url="https://dolphinscheduler.apache.org/",
python_requires=">=3.6",
keywords=[
"dolphinscheduler",
"workflow",
"scheduler",
"taskflow",
],
project_urls={
"Homepage": "https://dolphinscheduler.apache.org",
"Documentation": "https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/quick-start.html",
"Source": "https://github.com/apache/dolphinscheduler",
"Issue Tracker": "https://github.com/apache/dolphinscheduler/issues",
"Discussion": "https://github.com/apache/dolphinscheduler/discussions",
"Twitter": "https://twitter.com/dolphinschedule",
},
packages=find_packages(where="src"),
package_dir={"": "src"},
include_package_data=True,
classifiers=[
# complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers
"Development Status :: 1 - Planning",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Unix",
"Operating System :: POSIX",
"Operating System :: Microsoft :: Windows",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: User Interfaces",
],
install_requires=[
# Core
"py4j~=0.10",
# Dev
"pytest~=6.2",
]
)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
class ProcessDefinitionReleaseState:
"""
ProcessDefinition release state
"""
ONLINE: str = "ONLINE"
OFFLINE: str = "OFFLINE"
class ProcessDefinitionDefault:
"""
ProcessDefinition default values
"""
PROJECT: str = "project-pydolphin"
TENANT: str = "tenant_pydolphin"
USER: str = "userPythonGateway"
# TODO simple set password same as username
USER_PWD: str = "userPythonGateway"
USER_EMAIL: str = "userPythonGateway@dolphinscheduler.com"
USER_PHONE: str = "11111111111"
USER_STATE: int = 1
QUEUE: str = "queuePythonGateway"
WORKER_GROUP: str = "default"
class TaskPriority(str):
HIGHEST = "HIGHEST"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
LOWEST = "LOWEST"
class TaskFlag(str):
YES = "YES"
NO = "NO"
class TaskTimeoutFlag(str):
CLOSE = "CLOSE"
class TaskType(str):
SHELL = "SHELL"
class DefaultTaskCodeNum(str):
DEFAULT = 1
class JavaGatewayDefault(str):
RESULT_MESSAGE_KEYWORD = "msg"
RESULT_MESSAGE_SUCCESS = "success"
RESULT_STATUS_KEYWORD = "status"
RESULT_STATUS_SUCCESS = "SUCCESS"
RESULT_DATA = "data"
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional, Dict
# from pydolphinscheduler.side.user import User
from pydolphinscheduler.utils.string import attr2camel
class Base:
"""
Base
"""
_KEY_ATTR: set = {
"name",
"description"
}
_TO_DICT_ATTR: set = set()
DEFAULT_ATTR: Dict = {}
def __init__(
self,
name: str,
description: Optional[str] = None
):
self.name = name
self.description = description
def __repr__(self) -> str:
return f'<{type(self).__name__}: name="{self.name}">'
def __eq__(self, other):
return type(self) == type(other) and \
all(getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR)
# TODO check how Redash do
# TODO DRY
def to_dict(self, camel_attr=True) -> Dict:
# content = {}
# for attr, value in self.__dict__.items():
# # Don't publish private variables
# if attr.startswith("_"):
# continue
# else:
# content[snake2camel(attr)] = value
# content.update(self.DEFAULT_ATTR)
# return content
content = {}
for attr in self._TO_DICT_ATTR:
val = getattr(self, attr, None)
if camel_attr:
content[attr2camel(attr)] = val
else:
content[attr] = val
return content
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core.base import Base
class BaseSide(Base):
def __init__(
self,
name: str,
description: Optional[str] = None
):
super().__init__(name, description)
@classmethod
def create_if_not_exists(
cls,
# TODO comment for avoiding cycle import
# user: Optional[User] = ProcessDefinitionDefault.USER
user=ProcessDefinitionDefault.USER
):
"""
Create Base if not exists
"""
raise NotImplementedError
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from typing import Optional, List, Dict, Set
from pydolphinscheduler.constants import ProcessDefinitionReleaseState, ProcessDefinitionDefault
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.side import Tenant, Project, User
class ProcessDefinitionContext:
_context_managed_process_definition: Optional["ProcessDefinition"] = None
@classmethod
def set(cls, pd: "ProcessDefinition") -> None:
cls._context_managed_process_definition = pd
@classmethod
def get(cls) -> Optional["ProcessDefinition"]:
return cls._context_managed_process_definition
@classmethod
def delete(cls) -> None:
cls._context_managed_process_definition = None
class ProcessDefinition(Base):
"""
ProcessDefinition
TODO :ref: comment may not correct ref
TODO: maybe we should rename this class, currently use DS object name
"""
# key attribute for identify ProcessDefinition object
_KEY_ATTR = {
"name",
"project",
"tenant",
"release_state",
"param",
}
_TO_DICT_ATTR = {
"name",
"description",
"_project",
"_tenant",
"timeout",
"release_state",
"param",
"tasks",
"task_definition_json",
"task_relation_json",
}
def __init__(
self,
name: str,
description: Optional[str] = None,
user: Optional[str] = ProcessDefinitionDefault.USER,
project: Optional[str] = ProcessDefinitionDefault.PROJECT,
tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
param: Optional[List] = None
):
super().__init__(name, description)
self._user = user
self._project = project
self._tenant = tenant
self._queue = queue
self.timeout = timeout
self.release_state = release_state
self.param = param
self.tasks: dict = {}
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set()
self._process_definition_code = None
def __enter__(self) -> "ProcessDefinition":
ProcessDefinitionContext.set(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
ProcessDefinitionContext.delete()
@property
def tenant(self) -> Tenant:
return Tenant(self._tenant)
@tenant.setter
def tenant(self, tenant: Tenant) -> None:
self._tenant = tenant.name
@property
def project(self) -> Project:
return Project(self._project)
@project.setter
def project(self, project: Project) -> None:
self._project = project.name
@property
def user(self) -> User:
return User(self._user,
ProcessDefinitionDefault.USER_PWD,
ProcessDefinitionDefault.USER_EMAIL,
ProcessDefinitionDefault.USER_PHONE,
ProcessDefinitionDefault.TENANT,
ProcessDefinitionDefault.QUEUE,
ProcessDefinitionDefault.USER_STATE)
@property
def task_definition_json(self) -> List[Dict]:
if not self.tasks:
return [self.tasks]
else:
return [task.to_dict() for task in self.tasks.values()]
@property
def task_relation_json(self) -> List[Dict]:
if not self.tasks:
return [self.tasks]
else:
self._handle_root_relation()
return [tr.to_dict() for tr in self._task_relations]
# TODO inti DAG's tasks are in the same place
@property
def task_location(self) -> List[Dict]:
if not self.tasks:
return [self.tasks]
else:
return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in self.tasks]
@property
def task_list(self) -> List["Task"]:
return list(self.tasks.values())
def _handle_root_relation(self):
from pydolphinscheduler.core.task import TaskRelation
post_relation_code = set()
for relation in self._task_relations:
post_relation_code.add(relation.post_task_code)
for task in self.task_list:
if task.code not in post_relation_code:
root_relation = TaskRelation(pre_task_code=0, post_task_code=task.code)
self._task_relations.add(root_relation)
def add_task(self, task: "Task") -> None:
self.tasks[task.code] = task
task._process_definition = self
def add_tasks(self, tasks: List["Task"]) -> None:
for task in tasks:
self.add_task(task)
def get_task(self, code: str) -> "Task":
if code not in self.tasks:
raise ValueError("Task with code %s can not found in process definition %", (code, self.name))
return self.tasks[code]
# TODO which tying should return in this case
def get_tasks_by_name(self, name: str) -> Set["Task"]:
find = set()
for task in self.tasks.values():
if task.name == name:
find.add(task)
return find
def get_one_task_by_name(self, name: str) -> "Task":
tasks = self.get_tasks_by_name(name)
if not tasks:
raise ValueError(f"Can not find task with name {name}.")
return tasks.pop()
def run(self):
"""
Run ProcessDefinition instance, a shortcut for :ref: submit and :ref: start
Only support manual for now, schedule run will coming soon
:return:
"""
self.submit()
self.start()
def _ensure_side_model_exists(self):
"""
Ensure side model exists which including :ref: Project, Tenant, User.
If those model not exists, would create default value in :ref: ProcessDefinitionDefault
"""
# TODO used metaclass for more pythonic
self.tenant.create_if_not_exists(self._queue)
# model User have to create after Tenant created
self.user.create_if_not_exists()
# Project model need User object exists
self.project.create_if_not_exists(self._user)
def submit(self) -> int:
"""
Submit ProcessDefinition instance to java gateway
:return:
"""
self._ensure_side_model_exists()
gateway = launch_gateway()
self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
self._user,
self._project,
self.name,
str(self.description) if self.description else "",
str(self.param) if self.param else None,
json.dumps(self.task_location),
self.timeout,
self._tenant,
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
)
return self._process_definition_code
def start(self) -> None:
"""
Start ProcessDefinition instance which post to `start-process-instance` to java gateway
:return:
"""
gateway = launch_gateway()
gateway.entry_point.execProcessInstance(
self._user,
self._project,
self.name,
"",
"default",
24 * 3600,
)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional, List, Dict, Set, Union, Sequence, Tuple
from pydolphinscheduler.constants import TaskPriority, ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \
DefaultTaskCodeNum, JavaGatewayDefault
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker
from pydolphinscheduler.utils.string import snake2camel, class_name2camel
class ObjectJsonBase:
DEFAULT_ATTR = {}
def __int__(self, *args, **kwargs):
pass
def __str__(self) -> str:
content = []
for attribute, value in self.__dict__.items():
content.append(f"\"{snake2camel(attribute)}\": {value}")
content = ",".join(content)
return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}"
# TODO check how Redash do
# TODO DRY
def to_dict(self) -> Dict:
content = {snake2camel(attr): value for attr, value in self.__dict__.items()}
content.update(self.DEFAULT_ATTR)
return content
class TaskParams(ObjectJsonBase):
DEFAULT_CONDITION_RESULT = {
"successNode": [
""
],
"failedNode": [
""
]
}
def __init__(
self,
raw_script: str,
local_params: Optional[List] = None,
resource_list: Optional[List] = None,
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
):
super().__init__()
self.raw_script = raw_script
self.local_params = local_params or []
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
# TODO need better way to handle it, this code just for POC
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
class TaskRelation(ObjectJsonBase):
DEFAULT_ATTR = {
"name": "",
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
"conditionParams": {}
}
def __init__(
self,
pre_task_code: int,
post_task_code: int,
):
super().__init__()
self.pre_task_code = pre_task_code
self.post_task_code = post_task_code
def __hash__(self):
return hash(f"{self.post_task_code}, {self.post_task_code}")
class Task(Base):
DEFAULT_DEPS_ATTR = {
"name": "",
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
"conditionParams": {}
}
def __init__(
self,
name: str,
task_type: str,
task_params: TaskParams,
description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
timeout_notify_strategy: Optional = None,
timeout: Optional[int] = 0,
process_definition: Optional[ProcessDefinition] = None,
):
super().__init__(name, description)
self.task_type = task_type
self.task_params = task_params
self.flag = flag
self.task_priority = task_priority
self.worker_group = worker_group
self.fail_retry_times = fail_retry_times
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
self.timeout_flag = timeout_flag
self.timeout_notify_strategy = timeout_notify_strategy
self.timeout = timeout
self._process_definition = None
self.process_definition: ProcessDefinition = process_definition or ProcessDefinitionContext.get()
self._upstream_task_codes: Set[int] = set()
self._downstream_task_codes: Set[int] = set()
self._task_relation: Set[TaskRelation] = set()
# move attribute code and version after _process_definition and process_definition declare
self.code, self.version = self.gen_code_and_version()
# Add task to process definition, maybe we could put into property process_definition latter
if self.process_definition is not None and self.code not in self.process_definition.tasks:
self.process_definition.add_task(self)
@property
def process_definition(self) -> Optional[ProcessDefinition]:
if self._process_definition:
return self._process_definition
else:
raise ValueError(f'Task {self} has not been assigned to a ProcessDefinition yet')
@process_definition.setter
def process_definition(self, process_definition: Optional[ProcessDefinition]):
self._process_definition = process_definition
def __hash__(self):
return hash(self.code)
def __lshift__(self, other: Union["Task", Sequence["Task"]]):
"""Implements Task << Task"""
self.set_upstream(other)
return other
def __rshift__(self, other: Union["Task", Sequence["Task"]]):
"""Implements Task >> Task"""
self.set_downstream(other)
return other
def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
"""Called for Task >> [Task] because list don't have __rshift__ operators."""
self.__lshift__(other)
return self
def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
"""Called for Task << [Task] because list don't have __lshift__ operators."""
self.__rshift__(other)
return self
def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True) -> None:
if not isinstance(tasks, Sequence):
tasks = [tasks]
for task in tasks:
if upstream:
self._upstream_task_codes.add(task.code)
task._downstream_task_codes.add(self.code)
if self._process_definition:
task_relation = TaskRelation(
pre_task_code=task.code,
post_task_code=self.code,
)
self.process_definition._task_relations.add(task_relation)
else:
self._downstream_task_codes.add(task.code)
task._upstream_task_codes.add(self.code)
if self._process_definition:
task_relation = TaskRelation(
pre_task_code=self.code,
post_task_code=task.code,
)
self.process_definition._task_relations.add(task_relation)
def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
self._set_deps(tasks, upstream=True)
def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
self._set_deps(tasks, upstream=False)
# TODO code should better generate in bulk mode when :ref: processDefinition run submit or start
def gen_code_and_version(self) -> Tuple:
# TODO get code from specific project process definition and task name
gateway = launch_gateway()
result = gateway.entry_point.getCodeAndVersion(self.process_definition._project, self.name)
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
def to_dict(self, camel_attr=True) -> Dict:
content = {}
for attr, value in self.__dict__.items():
# Don't publish private variables
if attr.startswith("_"):
continue
elif isinstance(value, TaskParams):
content[snake2camel(attr)] = value.to_dict()
else:
content[snake2camel(attr)] = value
return content
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Optional
from py4j.java_collections import JavaMap
from py4j.java_gateway import JavaGateway, GatewayParameters
from pydolphinscheduler.constants import JavaGatewayDefault
def launch_gateway() -> JavaGateway:
# TODO Note that automatic conversion makes calling Java methods slightly less efficient because
# in the worst case, Py4J needs to go through all registered converters for all parameters.
# This is why automatic conversion is disabled by default.
gateway = JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True))
return gateway
def gateway_result_checker(
result: JavaMap,
msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS
) -> Any:
if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \
JavaGatewayDefault.RESULT_STATUS_SUCCESS:
raise RuntimeError(f"Failed when try to got result for java gateway")
if msg_check is not None and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check:
raise ValueError(f"Get result state not success.")
return result
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pydolphinscheduler.side.project import Project
from pydolphinscheduler.side.tenant import Tenant
from pydolphinscheduler.side.user import User
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker
class Project(BaseSide):
"""
Project
"""
def __init__(
self,
name: str = ProcessDefinitionDefault.PROJECT,
description: Optional[str] = None
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
"""
Create Project if not exists
"""
gateway = launch_gateway()
result = gateway.entry_point.createProject(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker
class Queue(BaseSide):
"""
Queue
"""
def __init__(
self,
name: str = ProcessDefinitionDefault.QUEUE,
description: Optional[str] = ""
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
"""
Create Queue if not exists
"""
gateway = launch_gateway()
# Here we set Queue.name and Queue.queueName same as self.name
result = gateway.entry_point.createProject(user, self.name, self.name)
gateway_result_checker(result, None)
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker
class Tenant(BaseSide):
"""
Tenant
"""
def __init__(
self,
name: str = ProcessDefinitionDefault.TENANT,
queue: str = ProcessDefinitionDefault.QUEUE,
description: Optional[str] = None
):
super().__init__(name, description)
self.queue = queue
def create_if_not_exists(self, queue_name: str, user=ProcessDefinitionDefault.USER) -> None:
"""
Create Tenant if not exists
"""
gateway = launch_gateway()
result = gateway.entry_point.createTenant(self.name, self.description, queue_name)
# gateway_result_checker(result, None)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker
class User(BaseSide):
_KEY_ATTR = {
"name",
"password",
"email",
"phone",
"tenant",
"queue",
"status",
}
def __init__(
self,
name: str,
password: str,
email: str,
phone: str,
tenant: str,
queue: Optional[str] = None,
status: Optional[int] = 1,
):
super().__init__(name)
self.password = password
self.email = email
self.phone = phone
self.tenant = tenant
self.queue = queue
self.status = status
def create_if_not_exists(self, **kwargs):
"""
Create User if not exists
"""
gateway = launch_gateway()
result = gateway.entry_point.createUser(
self.name,
self.password,
self.email,
self.phone,
self.tenant,
self.queue,
self.status
)
# TODO recover result checker
# gateway_result_checker(result, None)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
class WorkerGroup(BaseSide):
"""
Worker Group
"""
def __init__(
self,
name: str,
address: str,
description: Optional[str] = None
):
super().__init__(name, description)
self.address = address
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams
class Shell(Task):
# TODO maybe we could use instance name to replace attribute `name`
# which is simplify as `task_shell = Shell(command = "echo 1")` and
# task.name assign to `task_shell`
def __init__(
self,
name: str,
command: str,
task_type: str = TaskType.SHELL,
*args, **kwargs
):
task_params = TaskParams(raw_script=command)
super().__init__(name, task_type, task_params, *args, **kwargs)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
def attr2camel(attr: str, include_private=True):
if include_private:
attr = attr.lstrip("_")
return snake2camel(attr)
def snake2camel(snake: str):
components = snake.split("_")
return components[0] + "".join(x.title() for x in components[1:])
def class_name2camel(class_name: str):
return class_name[0].lower() + class_name[1:]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from pydolphinscheduler.constants import ProcessDefinitionDefault, ProcessDefinitionReleaseState
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskParams
from pydolphinscheduler.side import Tenant, Project, User
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
@pytest.mark.parametrize(
"func",
[
"run", "submit", "start"
]
)
def test_process_definition_key_attr(func):
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert hasattr(pd, func), f"ProcessDefinition instance don't have attribute `{func}`"
@pytest.mark.parametrize(
"name,value",
[
("project", Project(ProcessDefinitionDefault.PROJECT)),
("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
("user", User(ProcessDefinitionDefault.USER,
ProcessDefinitionDefault.USER_PWD,
ProcessDefinitionDefault.USER_EMAIL,
ProcessDefinitionDefault.USER_PHONE,
ProcessDefinitionDefault.TENANT,
ProcessDefinitionDefault.QUEUE,
ProcessDefinitionDefault.USER_STATE)),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
def test_process_definition_default_value(name, value):
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert getattr(pd, name) == value, \
f"ProcessDefinition instance attribute `{name}` have not except default value `{getattr(pd, name)}`"
@pytest.mark.parametrize(
"name,cls,expect",
[
("project", Project, "project"),
("tenant", Tenant, "tenant"),
],
)
def test_process_definition_set_attr(name, cls, expect):
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, name, cls(expect))
assert getattr(pd, name) == cls(
expect), f"ProcessDefinition set attribute `{name}` do not work expect"
def test_process_definition_to_dict_without_task():
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
"project": ProcessDefinitionDefault.PROJECT,
"tenant": ProcessDefinitionDefault.TENANT,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None,
"tasks": {},
"taskDefinitionJson": [{}],
"taskRelationJson": [{}],
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.to_dict() == expect
def test_process_definition_simple():
expect_tasks_num = 5
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
for i in range(expect_tasks_num):
task_params = TaskParams(raw_script=f"test-raw-script-{i}")
curr_task = Task(name=f"task-{i}", task_type=f"type-{i}", task_params=task_params)
# Set deps task i as i-1 parent
if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
assert len(pd.tasks) == expect_tasks_num
# Test if task process_definition same as origin one
task: Task = pd.get_one_task_by_name("task-0")
assert pd is task.process_definition
# Test if all tasks with expect deps
for i in range(expect_tasks_num):
task: Task = pd.get_one_task_by_name(f"task-{i}")
if i == 0:
assert task._upstream_task_codes == set()
assert task._downstream_task_codes == {pd.get_one_task_by_name("task-1").code}
elif i == expect_tasks_num - 1:
assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code}
assert task._downstream_task_codes == set()
else:
assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code}
assert task._downstream_task_codes == {pd.get_one_task_by_name(f"task-{i + 1}").code}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import patch
from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task
def test_task_params_to_dict():
raw_script = "test_task_params_to_dict"
expect = {
"resourceList": [],
"localParams": [],
"rawScript": raw_script,
"dependence": {},
"conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
"waitStartTimeout": {}
}
task_param = TaskParams(raw_script=raw_script)
assert task_param.to_dict() == expect
def test_task_relation_to_dict():
pre_task_code = 123
post_task_code = 456
expect = {
"name": "",
"preTaskCode": pre_task_code,
"postTaskCode": post_task_code,
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
"conditionParams": {}
}
task_param = TaskRelation(pre_task_code=pre_task_code, post_task_code=post_task_code)
assert task_param.to_dict() == expect
def test_task_to_dict():
code = "123"
name = "test_task_to_dict"
task_type = "test_task_to_dict_type"
raw_script = "test_task_params_to_dict"
expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": task_type,
"taskParams": {
"resourceList": [],
"localParams": [],
"rawScript": raw_script,
"dependence": {},
"conditionResult": {
"successNode": [
""
],
"failedNode": [
""
]
},
"waitStartTimeout": {}
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "worker-group-pydolphin",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0
}
with patch('pydolphinscheduler.core.task.Task.gen_code', return_value=code):
task = Task(
name=name,
task_type=task_type,
task_params=TaskParams(raw_script)
)
assert task.to_dict() == expect
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import patch
from pydolphinscheduler.tasks.shell import Shell
def test_shell_to_dict():
code = "123"
name = "test_shell_to_dict"
command = "echo test shell"
expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "SHELL",
"taskParams": {
"resourceList": [],
"localParams": [],
"rawScript": command,
"dependence": {},
"conditionResult": {
"successNode": [
""
],
"failedNode": [
""
]
},
"waitStartTimeout": {}
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "worker-group-pydolphin",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0
}
with patch('pydolphinscheduler.core.task.Task.gen_code', return_value=code):
shell = Shell(name, command)
assert shell.to_dict() == expect
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from py4j.java_gateway import java_import, JavaGateway
def test_gateway_connect():
gateway = JavaGateway()
app = gateway.entry_point
assert app.ping() == "PONG"
def test_jvm_simple():
gateway = JavaGateway()
smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
assert bigger > smaller
def test_python_client_java_import_single():
gateway = JavaGateway()
java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.FileUtils")
assert hasattr(gateway.jvm, "FileUtils")
def test_python_client_java_import_package():
gateway = JavaGateway()
java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
# test if jvm view have some common utils
for util in ("FileUtils", "OSUtils", "DateUtils"):
assert hasattr(gateway.jvm, util)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import py4j.GatewayServer;
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.server.master.*",
"org.apache.dolphinscheduler.server.worker.*",
"org.apache.dolphinscheduler.server.monitor.*",
"org.apache.dolphinscheduler.server.log.*"
})
})
public class PythonGatewayServer extends SpringBootServletInitializer {
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProjectService projectService;
@Autowired
private TenantService tenantService;
@Autowired
private ExecutorService executorService;
@Autowired
private ProcessDefinitionService processDefinitionService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private UsersService usersService;
@Autowired
private QueueService queueService;
@Autowired
private ProjectMapper projectMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
// TODO replace this user to build in admin user if we make sure build in one could not be change
private final User dummyAdminUser = new User() {
{
setId(Integer.MAX_VALUE);
setUserName("dummyUser");
setUserType(UserType.ADMIN_USER);
}
};
private final Queue queuePythonGateway = new Queue() {
{
setId(Integer.MAX_VALUE);
setQueueName("queuePythonGateway");
}
};
public String ping() {
return "PONG";
}
// TODO Should we import package in python client side? utils package can but service can not, why
// Core api
public Map<String, Object> genTaskCodeList(Integer genNum) {
return taskDefinitionService.genTaskCodeList(genNum);
}
public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws SnowFlakeUtils.SnowFlakeException {
Project project = projectMapper.queryByName(projectName);
Map<String, Long> result = new HashMap<>();
// project do not exists, mean task not exists too, so we should directly return init value
if (project == null) {
result.put("code", SnowFlakeUtils.getInstance().nextId());
result.put("version", 0L);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
if (taskDefinition == null) {
result.put("code", SnowFlakeUtils.getInstance().nextId());
result.put("version", 0L);
} else {
result.put("code", taskDefinition.getCode());
result.put("version", (long) taskDefinition.getVersion());
}
return result;
}
/**
* create or update process definition.
* If process definition do not exists in Project=`projectCode` would create a new one
* If process definition already exists in Project=`projectCode` would update it
* All requests
* <p>
*
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
public Long createOrUpdateProcessDefinition(String userName,
String projectName,
String name,
String description,
String globalParams,
String locations,
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson) {
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
if (verifyProcessDefinitionExists.get(Constants.STATUS) != Status.SUCCESS) {
// update process definition
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
long processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
return processDefinitionCode;
} else {
// create process definition
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
return processDefinition.getCode();
}
}
public void execProcessInstance(String userName,
String projectName,
String processDefinitionName,
String cronTime,
String workerGroup,
Integer timeout
) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// temp default value
FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
TaskDependType taskDependType = TaskDependType.TASK_POST;
WarningType warningType = WarningType.NONE;
RunMode runMode = RunMode.RUN_MODE_SERIAL;
Priority priority = Priority.MEDIUM;
int warningGroupId = 0;
Long environmentCode = -1L;
Map<String, String> startParams = null;
Integer expectedParallelismNumber = null;
String startNodeList = null;
// make sure process definition online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
executorService.execProcessInstance(user,
project.getCode(),
processDefinition.getCode(),
cronTime,
null,
failureStrategy,
startNodeList,
taskDependType,
warningType,
warningGroupId,
runMode,
priority,
workerGroup,
environmentCode,
timeout,
startParams,
expectedParallelismNumber,
0
);
}
// side object
public Map<String, Object> createProject(String userName, String name, String desc) {
User user = usersService.queryUser(userName);
return projectService.createProject(user, name, desc);
}
public Map<String, Object> createQueue(String name, String queueName) {
Result<Object> verifyQueueExists = queueService.verifyQueue(name, queueName);
if (verifyQueueExists.getCode() == 0) {
return queueService.createQueue(dummyAdminUser, name, queueName);
} else {
Map<String, Object> result = new HashMap<>();
// TODO function putMsg do not work here
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
}
}
public Map<String, Object> createTenant(String tenantCode, String desc, String queueName) throws Exception {
if (tenantService.checkTenantExists(tenantCode)) {
Map<String, Object> result = new HashMap<>();
// TODO function putMsg do not work here
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
} else {
Result<Object> verifyQueueExists = queueService.verifyQueue(queueName, queueName);
if (verifyQueueExists.getCode() == 0) {
// TODO why create do not return id?
queueService.createQueue(dummyAdminUser, queueName, queueName);
}
Map<String, Object> result = queueService.queryQueueName(queueName);
List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
Queue queue = queueList.get(0);
return tenantService.createTenant(dummyAdminUser, tenantCode, queue.getId(), desc);
}
}
public void createUser(String userName,
String userPassword,
String email,
String phone,
String tenantCode,
String queue,
int state) {
User user = usersService.queryUser(userName);
if (Objects.isNull(user)) {
Map<String, Object> tenantResult = tenantService.queryByTenantCode(tenantCode);
Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
usersService.createUser(userName, userPassword, email, tenant.getId(), phone, queue, state);
}
}
@PostConstruct
public void run() {
GatewayServer server = new GatewayServer(this);
GatewayServer.turnLoggingOn();
// Start server to accept python client RPC
server.start();
}
public static void main(String[] args) {
SpringApplication.run(PythonGatewayServer.class, args);
}
}
......@@ -51,6 +51,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-python</artifactId>
</dependency>
</dependencies>
</project>
......@@ -66,7 +66,8 @@ public class StandaloneServer {
new SpringApplicationBuilder(
ApiApplicationServer.class,
MasterServer.class,
WorkerServer.class
WorkerServer.class,
PythonGatewayServer.class
).run(args);
}
......
......@@ -127,6 +127,7 @@
<reflections.version>0.9.12</reflections.version>
<byte-buddy.version>1.9.16</byte-buddy.version>
<java-websocket.version>1.5.1</java-websocket.version>
<py4j.version>0.10.9</py4j.version>
</properties>
<dependencyManagement>
......@@ -263,6 +264,11 @@
<artifactId>dolphinscheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-python</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
......@@ -678,6 +684,12 @@
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>${py4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......@@ -1234,5 +1246,6 @@
<module>dolphinscheduler-service</module>
<module>dolphinscheduler-microbench</module>
<module>dolphinscheduler-standalone-server</module>
<module>dolphinscheduler-python</module>
</modules>
</project>
......@@ -201,6 +201,7 @@ protostuff-core-1.7.2.jar
protostuff-runtime-1.7.2.jar
protostuff-api-1.7.2.jar
protostuff-collectionschema-1.7.2.jar
py4j-0.10.9.jar
quartz-2.3.0.jar
quartz-jobs-2.3.0.jar
reflections-0.9.12.jar
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册