未验证 提交 cf522e2f 编写于 作者: J JieguangZhou 提交者: GitHub

[improve] Optimize MLFlow task plugin for easy of use (#12071)

optimize code
上级 b52da640
...@@ -20,7 +20,6 @@ The MLflow plugin currently supports and will support the following: ...@@ -20,7 +20,6 @@ The MLflow plugin currently supports and will support the following:
- MLflow Models - MLflow Models
- MLFLOW: Use `MLflow models serve` to deploy a model service - MLFLOW: Use `MLflow models serve` to deploy a model service
- Docker: Run the container after packaging the docker image - Docker: Run the container after packaging the docker image
- Docker Compose: Use docker compose to run the container, it will replace the docker run above
## Create Task ## Create Task
...@@ -98,22 +97,26 @@ You can now use this feature to run all MLFlow projects on Github (For example [ ...@@ -98,22 +97,26 @@ You can now use this feature to run all MLFlow projects on Github (For example [
![mlflow-models-docker](../../../../img/tasks/demo/mlflow-models-docker.png) ![mlflow-models-docker](../../../../img/tasks/demo/mlflow-models-docker.png)
#### DOCKER COMPOSE ## Environment to Prepare
![mlflow-models-docker-compose](../../../../img/tasks/demo/mlflow-models-docker-compose.png) ### Conda Environment
Please install [anaconda](https://docs.continuum.io/anaconda/install/) or [miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) in advance.
| **Parameter** | **Description** | **Method A:**
|------------------|----------------------------------------------------------|
| Max Cpu Limit | For example, `1.0` or `0.5`, the same as docker compose. |
| Max Memory Limit | For example `1G` or `500M`, the same as docker compose. |
## Environment to Prepare Config anaconda environment in `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
### Conda Environment Add the following content to the file:
```bash
# config anaconda environment
export PATH=/opt/anaconda3/bin:$PATH
```
You need to enter the admin account to configure a conda environment variable(Please
install [anaconda](https://docs.continuum.io/anaconda/install/) **Method B:**
or [miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) in advance).
You need to enter the admin account to configure a conda environment variable.
![mlflow-conda-env](../../../../img/tasks/demo/mlflow-conda-env.png) ![mlflow-conda-env](../../../../img/tasks/demo/mlflow-conda-env.png)
...@@ -139,3 +142,14 @@ After running, an MLflow service is started. ...@@ -139,3 +142,14 @@ After running, an MLflow service is started.
After this, you can visit the MLflow service (`http://localhost:5000`) page to view the experiments and models. After this, you can visit the MLflow service (`http://localhost:5000`) page to view the experiments and models.
![mlflow-server](../../../../img/tasks/demo/mlflow-server.png) ![mlflow-server](../../../../img/tasks/demo/mlflow-server.png)
### Preset Algorithm Repository Configuration
If you can't access github, you can modify the following fields in the `commom.properties` configuration file to replace the github address with an accessible address.
```yaml
# mlflow task plugin preset repository
ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
# mlflow task plugin preset repository version
ml.mlflow.preset_repository_version="main"
```
...@@ -19,7 +19,6 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo ...@@ -19,7 +19,6 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo
- MLflow Models - MLflow Models
- MLFLOW: 直接使用 `mlflow models serve` 部署模型。 - MLFLOW: 直接使用 `mlflow models serve` 部署模型。
- Docker: 打包 DOCKER 镜像后部署模型。 - Docker: 打包 DOCKER 镜像后部署模型。
- Docker Compose: 使用Docker Compose 部署模型,将会取代上面的Docker部署。
## 创建任务 ## 创建任务
...@@ -90,21 +89,25 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo ...@@ -90,21 +89,25 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo
![mlflow-models-docker](../../../../img/tasks/demo/mlflow-models-docker.png) ![mlflow-models-docker](../../../../img/tasks/demo/mlflow-models-docker.png)
#### DOCKER COMPOSE ## 环境准备
![mlflow-models-docker-compose](../../../../img/tasks/demo/mlflow-models-docker-compose.png) ### conda 环境配置
| **任务参数** | **描述** | 请提前[安装anaconda](https://docs.continuum.io/anaconda/install/) 或者[安装miniconda](https://docs.conda.io/en/latest/miniconda.html#installing)
|----------|--------------------------------------|
| 最大CPU限制 | 如 `1.0` 或者 `0.5`,与 docker compose 一致 |
| 最大内存限制 | 如 `1G` 或者 `500M`,与 docker compose 一致 |
## 环境准备 **方法A:**
### conda 环境配置 配置文件:/dolphinscheduler/conf/env/dolphinscheduler_env.sh。
在文件最后添加内容
```
# 配置你的conda环境路径
export PATH=/opt/anaconda3/bin:$PATH
```
你需要进入admin账户配置一个conda环境变量(请提前[安装anaconda](https://docs.continuum.io/anaconda/install/) **方法B:**
或者[安装miniconda](https://docs.conda.io/en/latest/miniconda.html#installing) )。
你需要进入admin账户配置一个conda环境变量。
![mlflow-conda-env](../../../../img/tasks/demo/mlflow-conda-env.png) ![mlflow-conda-env](../../../../img/tasks/demo/mlflow-conda-env.png)
...@@ -112,6 +115,7 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo ...@@ -112,6 +115,7 @@ MLflow 组件用于执行 MLflow 任务,目前包含Mlflow Projects,和MLflo
![mlflow-set-conda-env](../../../../img/tasks/demo/mlflow-set-conda-env.png) ![mlflow-set-conda-env](../../../../img/tasks/demo/mlflow-set-conda-env.png)
### MLflow service 启动 ### MLflow service 启动
确保你已经安装MLflow,可以使用`pip install mlflow`进行安装。 确保你已经安装MLflow,可以使用`pip install mlflow`进行安装。
...@@ -130,3 +134,15 @@ mlflow server -h 0.0.0.0 -p 5000 --serve-artifacts --backend-store-uri sqlite:// ...@@ -130,3 +134,15 @@ mlflow server -h 0.0.0.0 -p 5000 --serve-artifacts --backend-store-uri sqlite://
![mlflow-server](../../../../img/tasks/demo/mlflow-server.png) ![mlflow-server](../../../../img/tasks/demo/mlflow-server.png)
### 内置算法仓库配置
如果遇到github无法访问的情况,可以修改`commom.properties`配置文件的以下字段,将github地址替换能访问的地址。
```yaml
# mlflow task plugin preset repository
ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
# mlflow task plugin preset repository version
ml.mlflow.preset_repository_version="main"
```
...@@ -116,4 +116,9 @@ alert.rpc.port=50052 ...@@ -116,4 +116,9 @@ alert.rpc.port=50052
conda.path=/opt/anaconda3/etc/profile.d/conda.sh conda.path=/opt/anaconda3/etc/profile.d/conda.sh
# Task resource limit state # Task resource limit state
task.resource.limit.state=false task.resource.limit.state=false
\ No newline at end of file
# mlflow task plugin preset repository
ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
# mlflow task plugin preset repository version
ml.mlflow.preset_repository_version="main"
\ No newline at end of file
...@@ -32,15 +32,6 @@ tasks: ...@@ -32,15 +32,6 @@ tasks:
parameters: -P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9 parameters: -P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9
experiment_name: xgboost experiment_name: xgboost
- name: deploy_mlflow
deps: [train_xgboost_native]
task_type: MLflowModels
model_uri: models:/xgboost_native/Production
mlflow_tracking_uri: *mlflow_tracking_uri
deploy_mode: MLFLOW
port: 7001
- name: train_automl - name: train_automl
task_type: MLFlowProjectsAutoML task_type: MLFlowProjectsAutoML
mlflow_tracking_uri: *mlflow_tracking_uri mlflow_tracking_uri: *mlflow_tracking_uri
...@@ -68,11 +59,11 @@ tasks: ...@@ -68,11 +59,11 @@ tasks:
data_path: /data/examples/iris data_path: /data/examples/iris
search_params: max_depth=[5, 10];n_estimators=[100, 200] search_params: max_depth=[5, 10];n_estimators=[100, 200]
- name: deploy_mlflow
- name: deploy_docker_compose
task_type: MLflowModels
deps: [train_basic_algorithm] deps: [train_basic_algorithm]
task_type: MLflowModels
model_uri: models:/iris_B/Production model_uri: models:/iris_B/Production
mlflow_tracking_uri: *mlflow_tracking_uri mlflow_tracking_uri: *mlflow_tracking_uri
deploy_mode: DOCKER COMPOSE deploy_mode: MLFLOW
port: 7003 port: 7001
...@@ -43,17 +43,6 @@ with ProcessDefinition( ...@@ -43,17 +43,6 @@ with ProcessDefinition(
experiment_name="xgboost", experiment_name="xgboost",
) )
# Using MLFLOW to deploy model from custom mlflow project
deploy_mlflow = MLflowModels(
name="deploy_mlflow",
model_uri="models:/xgboost_native/Production",
mlflow_tracking_uri=mlflow_tracking_uri,
deploy_mode=MLflowDeployType.MLFLOW,
port=7001,
)
train_custom >> deploy_mlflow
# run automl to train model # run automl to train model
train_automl = MLFlowProjectsAutoML( train_automl = MLFlowProjectsAutoML(
name="train_automl", name="train_automl",
...@@ -88,16 +77,16 @@ with ProcessDefinition( ...@@ -88,16 +77,16 @@ with ProcessDefinition(
search_params="max_depth=[5, 10];n_estimators=[100, 200]", search_params="max_depth=[5, 10];n_estimators=[100, 200]",
) )
# Using DOCKER COMPOSE to deploy model from train_basic_algorithm # Using MLFLOW to deploy model from training lightgbm project
deploy_docker_compose = MLflowModels( deploy_mlflow = MLflowModels(
name="deploy_docker_compose", name="deploy_mlflow",
model_uri="models:/iris_B/Production", model_uri="models:/iris_B/Production",
mlflow_tracking_uri=mlflow_tracking_uri, mlflow_tracking_uri=mlflow_tracking_uri,
deploy_mode=MLflowDeployType.DOCKER_COMPOSE, deploy_mode=MLflowDeployType.MLFLOW,
port=7003, port=7001,
) )
train_basic_algorithm >> deploy_docker_compose train_basic_algorithm >> deploy_mlflow
pd.submit() pd.submit()
......
...@@ -43,7 +43,6 @@ class MLflowDeployType(str): ...@@ -43,7 +43,6 @@ class MLflowDeployType(str):
MLFLOW = "MLFLOW" MLFLOW = "MLFLOW"
DOCKER = "DOCKER" DOCKER = "DOCKER"
DOCKER_COMPOSE = "DOCKER COMPOSE"
DEFAULT_MLFLOW_TRACKING_URI = "http://127.0.0.1:5000" DEFAULT_MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
...@@ -83,10 +82,8 @@ class MLflowModels(BaseMLflow): ...@@ -83,10 +82,8 @@ class MLflowModels(BaseMLflow):
:param model_uri: Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. :param model_uri: Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format.
See https://mlflow.org/docs/latest/tracking.html#artifact-stores See https://mlflow.org/docs/latest/tracking.html#artifact-stores
:param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000 :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000
:param deploy_mode: MLflow deploy mode, support MLFLOW, DOCKER, DOCKER COMPOSE, default is DOCKER :param deploy_mode: MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER
:param port: deploy port, default is 7000 :param port: deploy port, default is 7000
:param cpu_limit: cpu limit, default is 1.0
:param memory_limit: memory limit, default is 500M
""" """
mlflow_task_type = MLflowTaskType.MLFLOW_MODELS mlflow_task_type = MLflowTaskType.MLFLOW_MODELS
...@@ -95,8 +92,6 @@ class MLflowModels(BaseMLflow): ...@@ -95,8 +92,6 @@ class MLflowModels(BaseMLflow):
"deploy_type", "deploy_type",
"deploy_model_key", "deploy_model_key",
"deploy_port", "deploy_port",
"cpu_limit",
"memory_limit",
} }
def __init__( def __init__(
...@@ -106,8 +101,6 @@ class MLflowModels(BaseMLflow): ...@@ -106,8 +101,6 @@ class MLflowModels(BaseMLflow):
mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI, mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
deploy_mode: Optional[str] = MLflowDeployType.DOCKER, deploy_mode: Optional[str] = MLflowDeployType.DOCKER,
port: Optional[int] = 7000, port: Optional[int] = 7000,
cpu_limit: Optional[float] = 1.0,
memory_limit: Optional[str] = "500M",
*args, *args,
**kwargs **kwargs
): ):
...@@ -116,8 +109,6 @@ class MLflowModels(BaseMLflow): ...@@ -116,8 +109,6 @@ class MLflowModels(BaseMLflow):
self.deploy_type = deploy_mode.upper() self.deploy_type = deploy_mode.upper()
self.deploy_model_key = model_uri self.deploy_model_key = model_uri
self.deploy_port = port self.deploy_port = port
self.cpu_limit = cpu_limit
self.memory_limit = memory_limit
class MLFlowProjectsCustom(BaseMLflow): class MLFlowProjectsCustom(BaseMLflow):
......
...@@ -63,19 +63,15 @@ def test_mlflow_models_get_define(): ...@@ -63,19 +63,15 @@ def test_mlflow_models_get_define():
name = "mlflow_models" name = "mlflow_models"
model_uri = "models:/xgboost_native/Production" model_uri = "models:/xgboost_native/Production"
port = 7001 port = 7001
cpu_limit = 2.0
memory_limit = "600M"
expect = deepcopy(EXPECT) expect = deepcopy(EXPECT)
expect["name"] = name expect["name"] = name
task_params = expect["taskParams"] task_params = expect["taskParams"]
task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_MODELS task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_MODELS
task_params["deployType"] = MLflowDeployType.DOCKER_COMPOSE task_params["deployType"] = MLflowDeployType.DOCKER
task_params["deployModelKey"] = model_uri task_params["deployModelKey"] = model_uri
task_params["deployPort"] = port task_params["deployPort"] = port
task_params["cpuLimit"] = cpu_limit
task_params["memoryLimit"] = memory_limit
with patch( with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version", "pydolphinscheduler.core.task.Task.gen_code_and_version",
...@@ -85,10 +81,8 @@ def test_mlflow_models_get_define(): ...@@ -85,10 +81,8 @@ def test_mlflow_models_get_define():
name=name, name=name,
model_uri=model_uri, model_uri=model_uri,
mlflow_tracking_uri=MLFLOW_TRACKING_URI, mlflow_tracking_uri=MLFLOW_TRACKING_URI,
deploy_mode=MLflowDeployType.DOCKER_COMPOSE, deploy_mode=MLflowDeployType.DOCKER,
port=port, port=port,
cpu_limit=cpu_limit,
memory_limit=memory_limit,
) )
assert task.get_define() == expect assert task.get_define() == expect
......
...@@ -28,15 +28,17 @@ public class MlflowConstants { ...@@ -28,15 +28,17 @@ public class MlflowConstants {
public static final String JOB_TYPE_CUSTOM_PROJECT = "CustomProject"; public static final String JOB_TYPE_CUSTOM_PROJECT = "CustomProject";
public static final String PRESET_REPOSITORY = "https://github.com/apache/dolphinscheduler-mlflow"; public static final String PRESET_REPOSITORY_KEY = "ml.mlflow.preset_repository";
public static final String PRESET_REPOSITORY_VERSION_KEY = "ml.mlflow.preset_repository_version";
public static final String PRESET_PATH = "dolphinscheduler-mlflow"; public static final String PRESET_REPOSITORY = "https://github.com/apache/dolphinscheduler-mlflow";
public static final String PRESET_REPOSITORY_VERSION = "main"; public static final String PRESET_REPOSITORY_VERSION = "main";
public static final String PRESET_AUTOML_PROJECT = PRESET_PATH + "#Project-AutoML"; public static final String PRESET_AUTOML_PROJECT = "#Project-AutoML";
public static final String PRESET_BASIC_ALGORITHM_PROJECT = PRESET_PATH + "#Project-BasicAlgorithm"; public static final String PRESET_BASIC_ALGORITHM_PROJECT = "#Project-BasicAlgorithm";
public static final String MLFLOW_TASK_TYPE_PROJECTS = "MLflow Projects"; public static final String MLFLOW_TASK_TYPE_PROJECTS = "MLflow Projects";
...@@ -46,14 +48,6 @@ public class MlflowConstants { ...@@ -46,14 +48,6 @@ public class MlflowConstants {
public static final String MLFLOW_MODELS_DEPLOY_TYPE_DOCKER = "DOCKER"; public static final String MLFLOW_MODELS_DEPLOY_TYPE_DOCKER = "DOCKER";
public static final String MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE = "DOCKER COMPOSE";
/**
* template file
*/
public static final String TEMPLATE_DOCKER_COMPOSE = "docker-compose.yml";
/** /**
* mlflow command * mlflow command
*/ */
...@@ -81,8 +75,7 @@ public class MlflowConstants { ...@@ -81,8 +75,7 @@ public class MlflowConstants {
public static final String MLFLOW_RUN_CUSTOM_PROJECT = "mlflow run $repo " public static final String MLFLOW_RUN_CUSTOM_PROJECT = "mlflow run $repo "
+ "%s " + "%s "
+ "--experiment-name=\"%s\" " + "--experiment-name=\"%s\"";
+ "--version=\"%s\" ";
public static final String MLFLOW_MODELS_SERVE = "mlflow models serve -m %s --port %s -h 0.0.0.0"; public static final String MLFLOW_MODELS_SERVE = "mlflow models serve -m %s --port %s -h 0.0.0.0";
...@@ -94,20 +87,10 @@ public class MlflowConstants { ...@@ -94,20 +87,10 @@ public class MlflowConstants {
+ "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20" + "--health-cmd \"curl --fail http://127.0.0.1:8080/ping || exit 1\" --health-interval 5s --health-retries 20"
+ " %s"; + " %s";
public static final String DOCKER_COMPOSE_RUN = "docker-compose up -d";
public static final String SET_DOCKER_COMPOSE_ENV = "export DS_TASK_MLFLOW_IMAGE_NAME=%s\n"
+ "export DS_TASK_MLFLOW_CONTAINER_NAME=%s\n"
+ "export DS_TASK_MLFLOW_DEPLOY_PORT=%s\n"
+ "export DS_TASK_MLFLOW_CPU_LIMIT=%s\n"
+ "export DS_TASK_MLFLOW_MEMORY_LIMIT=%s";
public static final String DOCKER_HEALTH_CHECK = "docker inspect --format \"{{json .State.Health.Status }}\" %s"; public static final String DOCKER_HEALTH_CHECK = "docker inspect --format \"{{json .State.Health.Status }}\" %s";
public static final int DOCKER_HEALTH_CHECK_TIMEOUT = 20; public static final int DOCKER_HEALTH_CHECK_TIMEOUT = 20;
public static final int DOCKER_HEALTH_CHECK_INTERVAL = 5000; public static final int DOCKER_HEALTH_CHECK_INTERVAL = 5000;
public static final String GIT_CLONE_REPO = "git clone %s %s";
} }
...@@ -17,10 +17,14 @@ ...@@ -17,10 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.mlflow; package org.apache.dolphinscheduler.plugin.task.mlflow;
import lombok.Data;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.HashMap; import java.util.HashMap;
@Data
public class MlflowParameters extends AbstractParameters { public class MlflowParameters extends AbstractParameters {
/** /**
...@@ -36,7 +40,7 @@ public class MlflowParameters extends AbstractParameters { ...@@ -36,7 +40,7 @@ public class MlflowParameters extends AbstractParameters {
*/ */
private String mlflowProjectRepository; private String mlflowProjectRepository;
private String mlflowProjectVersion = "master"; private String mlflowProjectVersion = "";
/** /**
* AutoML parameters * AutoML parameters
...@@ -76,160 +80,9 @@ public class MlflowParameters extends AbstractParameters { ...@@ -76,160 +80,9 @@ public class MlflowParameters extends AbstractParameters {
private String deployPort; private String deployPort;
private String cpuLimit;
private String memoryLimit;
public void setAlgorithm(String algorithm) {
this.algorithm = algorithm;
}
public String getAlgorithm() {
return algorithm;
}
public void setParams(String params) {
this.params = params;
}
public String getParams() {
return params;
}
public void setSearchParams(String searchParams) {
this.searchParams = searchParams;
}
public String getSearchParams() {
return searchParams;
}
public void setDataPaths(String dataPath) {
this.dataPath = dataPath;
}
public String getDataPath() {
return dataPath;
}
public void setMlflowTaskType(String mlflowTaskType) {
this.mlflowTaskType = mlflowTaskType;
}
public String getMlflowTaskType() {
return mlflowTaskType;
}
public void setExperimentNames(String experimentName) {
this.experimentName = experimentName;
}
public String getExperimentName() {
return experimentName;
}
public void setModelNames(String modelName) {
this.modelName = modelName;
}
public String getModelName() {
return modelName;
}
public void setMlflowTrackingUris(String mlflowTrackingUri) {
this.mlflowTrackingUri = mlflowTrackingUri;
}
public String getMlflowTrackingUri() {
return mlflowTrackingUri;
}
public void setMlflowJobType(String mlflowJobType) {
this.mlflowJobType = mlflowJobType;
}
public String getMlflowJobType() {
return mlflowJobType;
}
public void setAutomlTool(String automlTool) {
this.automlTool = automlTool;
}
public String getMlflowProjectRepository() {
return mlflowProjectRepository;
}
public void setMlflowProjectRepository(String mlflowProjectRepository) {
this.mlflowProjectRepository = mlflowProjectRepository;
}
public String getMlflowProjectVersion() {
return mlflowProjectVersion;
}
public void setMlflowProjectVersion(String mlflowProjectVersion) {
this.mlflowProjectVersion = mlflowProjectVersion;
}
public String getAutomlTool() {
return automlTool;
}
public void setDeployType(String deployType) {
this.deployType = deployType;
}
public String getDeployType() {
return deployType;
}
public void setDeployModelKey(String deployModelKey) {
this.deployModelKey = deployModelKey;
}
public String getDeployModelKey() {
return deployModelKey;
}
public void setDeployPort(String deployPort) {
this.deployPort = deployPort;
}
public String getDeployPort() {
return deployPort;
}
public void setCpuLimit(String cpuLimit) {
this.cpuLimit = cpuLimit;
}
public String getCpuLimit() {
return cpuLimit;
}
public void setMemoryLimit(String memoryLimit) {
this.memoryLimit = memoryLimit;
}
public String getMemoryLimit() {
return memoryLimit;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
Boolean checkResult = true; return StringUtils.isNotEmpty(mlflowTrackingUri);
// Boolean checkResult = mlflowTrackingUri != null;
// if (mlflowJobType.equals(MlflowConstants.JOB_TYPE_BASIC_ALGORITHM)) {
// checkResult &= dataPath != null;
// checkResult &= experimentName != null;
// } else if (mlflowJobType.equals(MlflowConstants.JOB_TYPE_AUTOML)) {
// checkResult &= dataPath != null;
// checkResult &= automlTool != null;
// checkResult &= experimentName != null;
// } else {
// }
return checkResult;
} }
public HashMap<String, String> getParamsMap() { public HashMap<String, String> getParamsMap() {
...@@ -240,11 +93,13 @@ public class MlflowParameters extends AbstractParameters { ...@@ -240,11 +93,13 @@ public class MlflowParameters extends AbstractParameters {
paramsMap.put("experiment_name", experimentName); paramsMap.put("experiment_name", experimentName);
paramsMap.put("model_name", modelName); paramsMap.put("model_name", modelName);
paramsMap.put("MLFLOW_TRACKING_URI", mlflowTrackingUri); paramsMap.put("MLFLOW_TRACKING_URI", mlflowTrackingUri);
if (mlflowJobType.equals(MlflowConstants.JOB_TYPE_BASIC_ALGORITHM)) { switch (mlflowJobType){
addParamsMapForBasicAlgorithm(paramsMap); case MlflowConstants.JOB_TYPE_BASIC_ALGORITHM:
} else if (mlflowJobType.equals(MlflowConstants.JOB_TYPE_AUTOML)) { addParamsMapForBasicAlgorithm(paramsMap);
getParamsMapForAutoML(paramsMap); break;
} else { case MlflowConstants.JOB_TYPE_AUTOML:
getParamsMapForAutoML(paramsMap);
break;
} }
return paramsMap; return paramsMap;
} }
...@@ -262,6 +117,10 @@ public class MlflowParameters extends AbstractParameters { ...@@ -262,6 +117,10 @@ public class MlflowParameters extends AbstractParameters {
paramsMap.put("repo_version", MlflowConstants.PRESET_REPOSITORY_VERSION); paramsMap.put("repo_version", MlflowConstants.PRESET_REPOSITORY_VERSION);
} }
public Boolean isCustomProject() {
return mlflowJobType.equals(MlflowConstants.JOB_TYPE_CUSTOM_PROJECT);
}
public String getModelKeyName(String tag) throws IllegalArgumentException { public String getModelKeyName(String tag) throws IllegalArgumentException {
String imageName; String imageName;
if (deployModelKey.startsWith("runs:")) { if (deployModelKey.startsWith("runs:")) {
...@@ -271,23 +130,18 @@ public class MlflowParameters extends AbstractParameters { ...@@ -271,23 +130,18 @@ public class MlflowParameters extends AbstractParameters {
} else { } else {
throw new IllegalArgumentException("model key must start with runs:/ or models:/ "); throw new IllegalArgumentException("model key must start with runs:/ or models:/ ");
} }
imageName = imageName.replace("/", tag); imageName = imageName.replace("/", tag).toLowerCase();
return imageName; return imageName;
} }
public String getDockerComposeEnvCommand() {
String imageName = "mlflow/" + getModelKeyName(":");
String env = String.format(MlflowConstants.SET_DOCKER_COMPOSE_ENV, imageName, getContainerName(), deployPort, cpuLimit, memoryLimit);
return env;
}
public String getContainerName(){ public String getContainerName(){
String containerName = "ds-mlflow-" + getModelKeyName("-"); return "ds-mlflow-" + getModelKeyName("-");
return containerName;
} }
public boolean getIsDeployDocker(){ public boolean getIsDeployDocker(){
return deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER) || deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE); if (StringUtils.isEmpty(deployType)) {
return false;
}
return deployType.equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER);
} }
}
};
...@@ -17,37 +17,34 @@ ...@@ -17,37 +17,34 @@
package org.apache.dolphinscheduler.plugin.task.mlflow; package org.apache.dolphinscheduler.plugin.task.mlflow;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
/** /**
* shell task * shell task
*/ */
public class MlflowTask extends AbstractTask { public class MlflowTask extends AbstractTask {
/** private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)");
* shell parameters
*/
private MlflowParameters mlflowParameters;
/** /**
* shell command executor * shell command executor
*/ */
...@@ -57,6 +54,10 @@ public class MlflowTask extends AbstractTask { ...@@ -57,6 +54,10 @@ public class MlflowTask extends AbstractTask {
* taskExecutionContext * taskExecutionContext
*/ */
private final TaskExecutionContext taskExecutionContext; private final TaskExecutionContext taskExecutionContext;
/**
* shell parameters
*/
private MlflowParameters mlflowParameters;
/** /**
* constructor * constructor
...@@ -70,6 +71,34 @@ public class MlflowTask extends AbstractTask { ...@@ -70,6 +71,34 @@ public class MlflowTask extends AbstractTask {
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
static public String getPresetRepository() {
String presetRepository = PropertyUtils.getString(MlflowConstants.PRESET_REPOSITORY_KEY);
if (StringUtils.isEmpty(presetRepository)) {
presetRepository = MlflowConstants.PRESET_REPOSITORY;
}
return presetRepository;
}
static public String getPresetRepositoryVersion() {
String version = PropertyUtils.getString(MlflowConstants.PRESET_REPOSITORY_VERSION_KEY);
if (StringUtils.isEmpty(version)) {
version = MlflowConstants.PRESET_REPOSITORY_VERSION;
}
return version;
}
static public String getVersionString(String version, String repository) {
String versionString;
if (StringUtils.isEmpty(version)) {
versionString = "";
} else if (GIT_CHECK_PATTERN.matcher(repository).find()) {
versionString = String.format("--version=%s", version);
} else {
versionString = "";
}
return versionString;
}
@Override @Override
public void init() { public void init() {
logger.info("shell task params {}", taskExecutionContext.getTaskParams()); logger.info("shell task params {}", taskExecutionContext.getTaskParams());
...@@ -141,43 +170,59 @@ public class MlflowTask extends AbstractTask { ...@@ -141,43 +170,59 @@ public class MlflowTask extends AbstractTask {
args.add(String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri())); args.add(String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
String runCommand; String runCommand;
String versionString;
if (mlflowParameters.getMlflowJobType().equals(MlflowConstants.JOB_TYPE_BASIC_ALGORITHM)) { if (mlflowParameters.isCustomProject()) {
args.add(String.format(MlflowConstants.SET_DATA_PATH, mlflowParameters.getDataPath())); versionString = getVersionString(mlflowParameters.getMlflowProjectVersion(), mlflowParameters.getMlflowProjectRepository());
args.add(String.format(MlflowConstants.SET_REPOSITORY, MlflowConstants.PRESET_BASIC_ALGORITHM_PROJECT)); } else {
args.add(String.format(MlflowConstants.GIT_CLONE_REPO, MlflowConstants.PRESET_REPOSITORY, MlflowConstants.PRESET_PATH)); versionString = getVersionString(getPresetRepositoryVersion(), getPresetRepository());
}
runCommand = MlflowConstants.MLFLOW_RUN_BASIC_ALGORITHM;
runCommand = String.format(runCommand, mlflowParameters.getAlgorithm(), mlflowParameters.getParams(), mlflowParameters.getSearchParams(), mlflowParameters.getModelName(),
mlflowParameters.getExperimentName());
} else if (mlflowParameters.getMlflowJobType().equals(MlflowConstants.JOB_TYPE_AUTOML)) { switch (mlflowParameters.getMlflowJobType()) {
args.add(String.format(MlflowConstants.SET_DATA_PATH, mlflowParameters.getDataPath())); case MlflowConstants.JOB_TYPE_BASIC_ALGORITHM:
args.add(String.format(MlflowConstants.SET_REPOSITORY, MlflowConstants.PRESET_AUTOML_PROJECT)); args.add(String.format(MlflowConstants.SET_DATA_PATH, mlflowParameters.getDataPath()));
args.add(String.format(MlflowConstants.GIT_CLONE_REPO, MlflowConstants.PRESET_REPOSITORY, MlflowConstants.PRESET_PATH));
runCommand = MlflowConstants.MLFLOW_RUN_AUTOML_PROJECT; String repoBasicAlgorithm = getPresetRepository() + MlflowConstants.PRESET_BASIC_ALGORITHM_PROJECT;
runCommand = String.format(runCommand, mlflowParameters.getAutomlTool(), mlflowParameters.getParams(), mlflowParameters.getModelName(), mlflowParameters.getExperimentName()); args.add(String.format(MlflowConstants.SET_REPOSITORY, repoBasicAlgorithm));
} else if (mlflowParameters.getMlflowJobType().equals(MlflowConstants.JOB_TYPE_CUSTOM_PROJECT)) { runCommand = MlflowConstants.MLFLOW_RUN_BASIC_ALGORITHM;
args.add(String.format(MlflowConstants.SET_REPOSITORY, mlflowParameters.getMlflowProjectRepository())); runCommand = String.format(runCommand, mlflowParameters.getAlgorithm(), mlflowParameters.getParams(), mlflowParameters.getSearchParams(), mlflowParameters.getModelName(),
mlflowParameters.getExperimentName());
break;
runCommand = MlflowConstants.MLFLOW_RUN_CUSTOM_PROJECT; case MlflowConstants.JOB_TYPE_AUTOML:
runCommand = String.format(runCommand, mlflowParameters.getParams(), mlflowParameters.getExperimentName(), mlflowParameters.getMlflowProjectVersion()); args.add(String.format(MlflowConstants.SET_DATA_PATH, mlflowParameters.getDataPath()));
} else { String repoAutoML = getPresetRepository() + MlflowConstants.PRESET_AUTOML_PROJECT;
runCommand = String.format("Cant not Support %s", mlflowParameters.getMlflowJobType()); args.add(String.format(MlflowConstants.SET_REPOSITORY, repoAutoML));
runCommand = MlflowConstants.MLFLOW_RUN_AUTOML_PROJECT;
runCommand = String.format(runCommand, mlflowParameters.getAutomlTool(), mlflowParameters.getParams(), mlflowParameters.getModelName(), mlflowParameters.getExperimentName());
break;
case MlflowConstants.JOB_TYPE_CUSTOM_PROJECT:
args.add(String.format(MlflowConstants.SET_REPOSITORY, mlflowParameters.getMlflowProjectRepository()));
runCommand = MlflowConstants.MLFLOW_RUN_CUSTOM_PROJECT;
runCommand = String.format(runCommand, mlflowParameters.getParams(), mlflowParameters.getExperimentName());
break;
default:
throw new TaskException("Unsupported mlflow job type: " + mlflowParameters.getMlflowJobType());
}
// add version string to command if repository is local path
if (StringUtils.isNotEmpty(versionString)) {
runCommand = runCommand + " " + versionString;
} }
args.add(runCommand); args.add(runCommand);
String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap)); return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
return command;
} }
/**
* build mlflow models command
*/
protected String buildCommandForMlflowModels() { protected String buildCommandForMlflowModels() {
/**
* build mlflow models command
*/
Map<String, Property> paramsMap = getParamsMap(); Map<String, Property> paramsMap = getParamsMap();
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
...@@ -194,20 +239,9 @@ public class MlflowTask extends AbstractTask { ...@@ -194,20 +239,9 @@ public class MlflowTask extends AbstractTask {
args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName)); args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName));
args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName)); args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName));
args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(), imageName)); args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(), imageName));
} else if (mlflowParameters.getDeployType().equals(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE)) {
String templatePath = getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE);
args.add(String.format("cp %s %s", templatePath, taskExecutionContext.getExecutePath()));
String imageName = "mlflow/" + mlflowParameters.getModelKeyName(":");
String containerName = mlflowParameters.getContainerName();
args.add(String.format(MlflowConstants.MLFLOW_BUILD_DOCKER, deployModelKey, imageName));
args.add(String.format(MlflowConstants.DOCKER_RREMOVE_CONTAINER, containerName));
args.add(mlflowParameters.getDockerComposeEnvCommand());
args.add(MlflowConstants.DOCKER_COMPOSE_RUN);
} }
String command = ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap)); return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParamUtils.convert(paramsMap));
return command;
} }
private Map<String, Property> getParamsMap() { private Map<String, Property> getParamsMap() {
...@@ -216,7 +250,7 @@ public class MlflowTask extends AbstractTask { ...@@ -216,7 +250,7 @@ public class MlflowTask extends AbstractTask {
} }
public int checkDockerHealth() throws Exception { public int checkDockerHealth() {
logger.info("checking container healthy ... "); logger.info("checking container healthy ... ");
int exitCode = -1; int exitCode = -1;
String[] command = {"sh", "-c", String.format(MlflowConstants.DOCKER_HEALTH_CHECK, mlflowParameters.getContainerName())}; String[] command = {"sh", "-c", String.format(MlflowConstants.DOCKER_HEALTH_CHECK, mlflowParameters.getContainerName())};
...@@ -244,13 +278,8 @@ public class MlflowTask extends AbstractTask { ...@@ -244,13 +278,8 @@ public class MlflowTask extends AbstractTask {
} }
@Override @Override
public AbstractParameters getParameters() { public MlflowParameters getParameters() {
return mlflowParameters; return mlflowParameters;
} }
public String getTemplatePath(String template) {
String templatePath = MlflowTask.class.getClassLoader().getResource(template).getPath();
return templatePath;
}
} }
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3"
services:
mlflow-model:
image: "${DS_TASK_MLFLOW_IMAGE_NAME}"
container_name: "${DS_TASK_MLFLOW_CONTAINER_NAME}"
ports:
- "${DS_TASK_MLFLOW_DEPLOY_PORT}:8080"
deploy:
resources:
limits:
cpus: "${DS_TASK_MLFLOW_CPU_LIMIT}"
memory: "${DS_TASK_MLFLOW_MEMORY_LIMIT}"
environment:
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION: python
healthcheck:
test: ["CMD", "curl", "http://127.0.0.1:8080/ping"]
interval: 5s
timeout: 5s
retries: 5
\ No newline at end of file
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.dolphinler.plugin.task.mlflow; package org.apache.dolphinler.plugin.task.mlflow;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
...@@ -76,21 +78,46 @@ public class MlflowTaskTest { ...@@ -76,21 +78,46 @@ public class MlflowTaskTest {
return taskExecutionContext; return taskExecutionContext;
} }
@Test
public void testGetPresetRepositoryData() {
Assert.assertEquals("https://github.com/apache/dolphinscheduler-mlflow", MlflowTask.getPresetRepository());
Assert.assertEquals("main", MlflowTask.getPresetRepositoryVersion());
String definedRepository = "https://github.com/<MY-ID>/dolphinscheduler-mlflow";
when(PropertyUtils.getString(MlflowConstants.PRESET_REPOSITORY_KEY)).thenAnswer(invocation -> definedRepository);
Assert.assertEquals(definedRepository, MlflowTask.getPresetRepository());
String definedRepositoryVersion = "dev";
when(PropertyUtils.getString(MlflowConstants.PRESET_REPOSITORY_VERSION_KEY)).thenAnswer(invocation -> definedRepositoryVersion);
Assert.assertEquals(definedRepositoryVersion, MlflowTask.getPresetRepositoryVersion());
}
@Test
public void testGetVersionString() {
Assert.assertEquals("--version=main", MlflowTask.getVersionString("main", "https://github.com/apache/dolphinscheduler-mlflow"));
Assert.assertEquals("--version=master", MlflowTask.getVersionString("master", "https://github.com/apache/dolphinscheduler-mlflow"));
Assert.assertEquals("--version=main", MlflowTask.getVersionString("main", "git@github.com:apache/dolphinscheduler-mlflow.git"));
Assert.assertEquals("--version=master", MlflowTask.getVersionString("master", "git@github.com:apache/dolphinscheduler-mlflow.git"));
Assert.assertEquals("", MlflowTask.getVersionString("main", "/tmp/dolphinscheduler-mlflow"));
Assert.assertEquals("", MlflowTask.getVersionString("master", "/tmp/dolphinscheduler-mlflow"));
}
@Test @Test
public void testInitBasicAlgorithmTask() { public void testInitBasicAlgorithmTask() {
MlflowTask mlflowTask = initTask(createBasicAlgorithmParameters()); MlflowTask mlflowTask = initTask(createBasicAlgorithmParameters());
Assert.assertEquals(mlflowTask.buildCommand(), Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "data_path=/data/iris.csv\n" + "data_path=/data/iris.csv\n"
+ "repo=dolphinscheduler-mlflow#Project-BasicAlgorithm\n" + "repo=https://github.com/apache/dolphinscheduler-mlflow#Project-BasicAlgorithm\n"
+ "git clone https://github.com/apache/dolphinscheduler-mlflow dolphinscheduler-mlflow\n"
+ "mlflow run $repo " + "mlflow run $repo "
+ "-P algorithm=xgboost " + "-P algorithm=xgboost "
+ "-P data_path=$data_path " + "-P data_path=$data_path "
+ "-P params=\"n_estimators=100\" " + "-P params=\"n_estimators=100\" "
+ "-P search_params=\"\" " + "-P search_params=\"\" "
+ "-P model_name=\"BasicAlgorithm\" " + "-P model_name=\"BasicAlgorithm\" "
+ "--experiment-name=\"BasicAlgorithm\""); + "--experiment-name=\"BasicAlgorithm\" "
+ "--version=main");
} }
@Test @Test
...@@ -99,19 +126,32 @@ public class MlflowTaskTest { ...@@ -99,19 +126,32 @@ public class MlflowTaskTest {
Assert.assertEquals(mlflowTask.buildCommand(), Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "data_path=/data/iris.csv\n" + "data_path=/data/iris.csv\n"
+ "repo=dolphinscheduler-mlflow#Project-AutoML\n" + "repo=https://github.com/apache/dolphinscheduler-mlflow#Project-AutoML\n"
+ "git clone https://github.com/apache/dolphinscheduler-mlflow dolphinscheduler-mlflow\n"
+ "mlflow run $repo " + "mlflow run $repo "
+ "-P tool=autosklearn " + "-P tool=autosklearn "
+ "-P data_path=$data_path " + "-P data_path=$data_path "
+ "-P params=\"time_left_for_this_task=30\" " + "-P params=\"time_left_for_this_task=30\" "
+ "-P model_name=\"AutoML\" " + "-P model_name=\"AutoML\" "
+ "--experiment-name=\"AutoML\""); + "--experiment-name=\"AutoML\" "
+ "--version=main");
} }
@Test @Test
public void testInitCustomProjectTask() { public void testInitCustomProjectTask() {
MlflowTask mlflowTask = initTask(createCustomProjectParameters()); MlflowTask mlflowTask = initTask(createCustomProjectParameters());
// Version will be set if parameter.mlflowProjectVersion is empty
Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "repo=https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native\n"
+ "mlflow run $repo "
+ "-P learning_rate=0.2 "
+ "-P colsample_bytree=0.8 "
+ "-P subsample=0.9 "
+ "--experiment-name=\"custom_project\"");
// Version will be set if repository is remote path
mlflowTask.getParameters().setMlflowProjectVersion("dev");
Assert.assertEquals(mlflowTask.buildCommand(), Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n" "export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "repo=https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native\n" + "repo=https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native\n"
...@@ -120,7 +160,19 @@ public class MlflowTaskTest { ...@@ -120,7 +160,19 @@ public class MlflowTaskTest {
+ "-P colsample_bytree=0.8 " + "-P colsample_bytree=0.8 "
+ "-P subsample=0.9 " + "-P subsample=0.9 "
+ "--experiment-name=\"custom_project\" " + "--experiment-name=\"custom_project\" "
+ "--version=\"master\" "); + "--version=dev");
// Version will not be set if repository is local path
mlflowTask.getParameters().setMlflowProjectRepository("/tmp/dolphinscheduler-mlflow");
Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "repo=/tmp/dolphinscheduler-mlflow\n"
+ "mlflow run $repo "
+ "-P learning_rate=0.2 "
+ "-P colsample_bytree=0.8 "
+ "-P subsample=0.9 "
+ "--experiment-name=\"custom_project\"");
} }
@Test @Test
...@@ -143,24 +195,6 @@ public class MlflowTaskTest { ...@@ -143,24 +195,6 @@ public class MlflowTaskTest {
+ "mlflow/model:1"); + "mlflow/model:1");
} }
@Test
public void testModelsDeployDockerCompose() throws Exception {
MlflowTask mlflowTask = initTask(createModelDeplyDockerComposeParameters());
Assert.assertEquals(mlflowTask.buildCommand(),
"export MLFLOW_TRACKING_URI=http://127.0.0.1:5000\n"
+ "cp "
+ mlflowTask.getTemplatePath(MlflowConstants.TEMPLATE_DOCKER_COMPOSE)
+ " /tmp/dolphinscheduler_test\n"
+ "mlflow models build-docker -m models:/model/1 -n mlflow/model:1 --enable-mlserver\n"
+ "docker rm -f ds-mlflow-model-1\n"
+ "export DS_TASK_MLFLOW_IMAGE_NAME=mlflow/model:1\n"
+ "export DS_TASK_MLFLOW_CONTAINER_NAME=ds-mlflow-model-1\n"
+ "export DS_TASK_MLFLOW_DEPLOY_PORT=7000\n"
+ "export DS_TASK_MLFLOW_CPU_LIMIT=0.5\n"
+ "export DS_TASK_MLFLOW_MEMORY_LIMIT=200m\n"
+ "docker-compose up -d");
}
private MlflowTask initTask(MlflowParameters mlflowParameters) { private MlflowTask initTask(MlflowParameters mlflowParameters) {
TaskExecutionContext taskExecutionContext = createContext(mlflowParameters); TaskExecutionContext taskExecutionContext = createContext(mlflowParameters);
MlflowTask mlflowTask = new MlflowTask(taskExecutionContext); MlflowTask mlflowTask = new MlflowTask(taskExecutionContext);
...@@ -174,11 +208,11 @@ public class MlflowTaskTest { ...@@ -174,11 +208,11 @@ public class MlflowTaskTest {
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_PROJECTS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_PROJECTS);
mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_BASIC_ALGORITHM); mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_BASIC_ALGORITHM);
mlflowParameters.setAlgorithm("xgboost"); mlflowParameters.setAlgorithm("xgboost");
mlflowParameters.setDataPaths("/data/iris.csv"); mlflowParameters.setDataPath("/data/iris.csv");
mlflowParameters.setParams("n_estimators=100"); mlflowParameters.setParams("n_estimators=100");
mlflowParameters.setExperimentNames("BasicAlgorithm"); mlflowParameters.setExperimentName("BasicAlgorithm");
mlflowParameters.setModelNames("BasicAlgorithm"); mlflowParameters.setModelName("BasicAlgorithm");
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUri("http://127.0.0.1:5000");
return mlflowParameters; return mlflowParameters;
} }
...@@ -188,10 +222,10 @@ public class MlflowTaskTest { ...@@ -188,10 +222,10 @@ public class MlflowTaskTest {
mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_AUTOML); mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_AUTOML);
mlflowParameters.setAutomlTool("autosklearn"); mlflowParameters.setAutomlTool("autosklearn");
mlflowParameters.setParams("time_left_for_this_task=30"); mlflowParameters.setParams("time_left_for_this_task=30");
mlflowParameters.setDataPaths("/data/iris.csv"); mlflowParameters.setDataPath("/data/iris.csv");
mlflowParameters.setExperimentNames("AutoML"); mlflowParameters.setExperimentName("AutoML");
mlflowParameters.setModelNames("AutoML"); mlflowParameters.setModelName("AutoML");
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUri("http://127.0.0.1:5000");
return mlflowParameters; return mlflowParameters;
} }
...@@ -199,8 +233,8 @@ public class MlflowTaskTest { ...@@ -199,8 +233,8 @@ public class MlflowTaskTest {
MlflowParameters mlflowParameters = new MlflowParameters(); MlflowParameters mlflowParameters = new MlflowParameters();
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_PROJECTS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_PROJECTS);
mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_CUSTOM_PROJECT); mlflowParameters.setMlflowJobType(MlflowConstants.JOB_TYPE_CUSTOM_PROJECT);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUri("http://127.0.0.1:5000");
mlflowParameters.setExperimentNames("custom_project"); mlflowParameters.setExperimentName("custom_project");
mlflowParameters.setParams("-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9"); mlflowParameters.setParams("-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9");
mlflowParameters.setMlflowProjectRepository("https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native"); mlflowParameters.setMlflowProjectRepository("https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native");
...@@ -211,7 +245,7 @@ public class MlflowTaskTest { ...@@ -211,7 +245,7 @@ public class MlflowTaskTest {
MlflowParameters mlflowParameters = new MlflowParameters(); MlflowParameters mlflowParameters = new MlflowParameters();
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_MLFLOW);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUri("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("models:/model/1"); mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000"); mlflowParameters.setDeployPort("7000");
return mlflowParameters; return mlflowParameters;
...@@ -221,21 +255,9 @@ public class MlflowTaskTest { ...@@ -221,21 +255,9 @@ public class MlflowTaskTest {
MlflowParameters mlflowParameters = new MlflowParameters(); MlflowParameters mlflowParameters = new MlflowParameters();
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS); mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER); mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000"); mlflowParameters.setMlflowTrackingUri("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000");
return mlflowParameters;
}
private MlflowParameters createModelDeplyDockerComposeParameters() {
MlflowParameters mlflowParameters = new MlflowParameters();
mlflowParameters.setMlflowTaskType(MlflowConstants.MLFLOW_TASK_TYPE_MODELS);
mlflowParameters.setDeployType(MlflowConstants.MLFLOW_MODELS_DEPLOY_TYPE_DOCKER_COMPOSE);
mlflowParameters.setMlflowTrackingUris("http://127.0.0.1:5000");
mlflowParameters.setDeployModelKey("models:/model/1"); mlflowParameters.setDeployModelKey("models:/model/1");
mlflowParameters.setDeployPort("7000"); mlflowParameters.setDeployPort("7000");
mlflowParameters.setCpuLimit("0.5");
mlflowParameters.setMemoryLimit("200m");
return mlflowParameters; return mlflowParameters;
} }
} }
...@@ -721,7 +721,7 @@ export default { ...@@ -721,7 +721,7 @@ export default {
mlflow_deployModelKey: 'Model-URI', mlflow_deployModelKey: 'Model-URI',
mlflow_deployPort: 'Port', mlflow_deployPort: 'Port',
mlflowProjectRepository: 'Repository', mlflowProjectRepository: 'Repository',
mlflowProjectRepository_tips: 'github respository or path on worker', mlflowProjectRepository_tips: 'git respository or path on worker',
mlflowProjectVersion: 'Project Version', mlflowProjectVersion: 'Project Version',
mlflowProjectVersion_tips: 'git version', mlflowProjectVersion_tips: 'git version',
mlflow_cpuLimit: 'Max Cpu Limit', mlflow_cpuLimit: 'Max Cpu Limit',
......
...@@ -704,7 +704,7 @@ export default { ...@@ -704,7 +704,7 @@ export default {
mlflow_deployModelKey: '部署的模型URI', mlflow_deployModelKey: '部署的模型URI',
mlflow_deployPort: '监听端口', mlflow_deployPort: '监听端口',
mlflowProjectRepository: '运行仓库', mlflowProjectRepository: '运行仓库',
mlflowProjectRepository_tips: '可以为github仓库或worker上的路径', mlflowProjectRepository_tips: '可以为git仓库或worker上的路径',
mlflowProjectVersion: '项目版本', mlflowProjectVersion: '项目版本',
mlflowProjectVersion_tips: '项目git版本', mlflowProjectVersion_tips: '项目git版本',
mlflow_cpuLimit: '最大cpu限制', mlflow_cpuLimit: '最大cpu限制',
......
...@@ -23,8 +23,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] { ...@@ -23,8 +23,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] {
const deployTypeSpan = ref(0) const deployTypeSpan = ref(0)
const deployModelKeySpan = ref(0) const deployModelKeySpan = ref(0)
const deployPortSpan = ref(0) const deployPortSpan = ref(0)
const cpuLimitSpan = ref(0)
const memoryLimitSpan = ref(0)
const setFlag = () => { const setFlag = () => {
model.isModels = model.mlflowTaskType === 'MLflow Models' ? true : false model.isModels = model.mlflowTaskType === 'MLflow Models' ? true : false
...@@ -44,14 +42,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] { ...@@ -44,14 +42,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] {
} }
) )
watch(
() => [model.deployType],
() => {
cpuLimitSpan.value = model.deployType === 'DOCKER COMPOSE' ? 12 : 0
memoryLimitSpan.value = model.deployType === 'DOCKER COMPOSE' ? 12 : 0
}
)
setFlag() setFlag()
resetSpan() resetSpan()
...@@ -74,18 +64,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] { ...@@ -74,18 +64,6 @@ export function useMlflowModels(model: { [field: string]: any }): IJsonItem[] {
field: 'deployPort', field: 'deployPort',
name: t('project.node.mlflow_deployPort'), name: t('project.node.mlflow_deployPort'),
span: deployPortSpan span: deployPortSpan
},
{
type: 'input',
field: 'cpuLimit',
name: t('project.node.mlflow_cpuLimit'),
span: cpuLimitSpan
},
{
type: 'input',
field: 'memoryLimit',
name: t('project.node.mlflow_memoryLimit'),
span: memoryLimitSpan
} }
] ]
} }
...@@ -98,9 +76,5 @@ const DEPLOY_TYPE = [ ...@@ -98,9 +76,5 @@ const DEPLOY_TYPE = [
{ {
label: 'DOCKER', label: 'DOCKER',
value: 'DOCKER' value: 'DOCKER'
},
{
label: 'DOCKER COMPOSE',
value: 'DOCKER COMPOSE'
} }
] ]
...@@ -280,16 +280,16 @@ export function useCustomProject(model: { [field: string]: any }): IJsonItem[] { ...@@ -280,16 +280,16 @@ export function useCustomProject(model: { [field: string]: any }): IJsonItem[] {
export const MLFLOW_JOB_TYPE = [ export const MLFLOW_JOB_TYPE = [
{ {
label: 'BasicAlgorithm', label: 'Custom Project',
value: 'BasicAlgorithm' value: 'CustomProject'
}, },
{ {
label: 'AutoML', label: 'AutoML',
value: 'AutoML' value: 'AutoML'
}, },
{ {
label: 'Custom Project', label: 'BasicAlgorithm',
value: 'CustomProject' value: 'BasicAlgorithm'
} }
] ]
export const ALGORITHM = [ export const ALGORITHM = [
...@@ -311,12 +311,12 @@ export const ALGORITHM = [ ...@@ -311,12 +311,12 @@ export const ALGORITHM = [
} }
] ]
export const AutoMLTOOL = [ export const AutoMLTOOL = [
{
label: 'autosklearn',
value: 'autosklearn'
},
{ {
label: 'flaml', label: 'flaml',
value: 'flaml' value: 'flaml'
},
{
label: 'autosklearn',
value: 'autosklearn'
} }
] ]
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
import { useMlflowProjects, useMlflowModels } from '.' import { useMlflowProjects, useMlflowModels } from '.'
import { useCustomParams, useResources } from '.'
export const MLFLOW_TASK_TYPE = [ export const MLFLOW_TASK_TYPE = [
{ {
...@@ -61,6 +62,8 @@ export function useMlflow(model: { [field: string]: any }): IJsonItem[] { ...@@ -61,6 +62,8 @@ export function useMlflow(model: { [field: string]: any }): IJsonItem[] {
options: MLFLOW_TASK_TYPE options: MLFLOW_TASK_TYPE
}, },
...useMlflowProjects(model), ...useMlflowProjects(model),
...useMlflowModels(model) ...useMlflowModels(model),
useResources(),
...useCustomParams({ model, field: 'localParams', isSimple: true })
] ]
} }
...@@ -380,8 +380,6 @@ export function formatParams(data: INodeData): { ...@@ -380,8 +380,6 @@ export function formatParams(data: INodeData): {
taskParams.deployModelKey = data.deployModelKey taskParams.deployModelKey = data.deployModelKey
taskParams.mlflowProjectRepository = data.mlflowProjectRepository taskParams.mlflowProjectRepository = data.mlflowProjectRepository
taskParams.mlflowProjectVersion = data.mlflowProjectVersion taskParams.mlflowProjectVersion = data.mlflowProjectVersion
taskParams.cpuLimit = data.cpuLimit
taskParams.memoryLimit = data.memoryLimit
} }
if (data.taskType === 'DVC') { if (data.taskType === 'DVC') {
......
...@@ -47,10 +47,7 @@ export function useMlflow({ ...@@ -47,10 +47,7 @@ export function useMlflow({
deployType: 'MLFLOW', deployType: 'MLFLOW',
deployPort: '7000', deployPort: '7000',
mlflowJobType: 'CustomProject', mlflowJobType: 'CustomProject',
mlflowProjectVersion: 'master',
automlTool: 'flaml', automlTool: 'flaml',
cpuLimit: '0.5',
memoryLimit: '500M',
mlflowCustomProjectParameters: [], mlflowCustomProjectParameters: [],
delayTime: 0, delayTime: 0,
timeout: 30, timeout: 30,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册