未验证 提交 81930e54 编写于 作者: J Jiajie Zhong 提交者: GitHub

[python] Fix tasks with multiple upstream and workflow query error (#10941)

* when task with more than one upstreams, mapper
   TaskDefinitionMapper method queryByName will return
   more than one record, and failed the mybatis result
   type, so we have to add `limit 1` to it to
* add multiple runs of example in integrate test
* Change from subprocess.Popen to subprocess.call_check
  in integrating test which will raise an error when failed
上级 05308ccf
......@@ -51,7 +51,7 @@ jobs:
not-docs:
- '!(docs/**)'
py-change:
- 'dolphinscheduler-python/pydolphinscheduler'
- 'dolphinscheduler-python/pydolphinscheduler/**'
lint:
name: Lint
if: ${{ (needs.paths-filter.outputs.py-change == 'true') || (github.event_name == 'push') }}
......@@ -165,7 +165,7 @@ jobs:
- name: Install Dependences
run: |
python -m pip install --upgrade ${{ env.DEPENDENCES }}
- name: Run Tests Build Docs
- name: Run Integrate Tests
run: |
python -m tox -vv -e integrate-test
result:
......
......@@ -171,10 +171,11 @@ public class PythonGateway {
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// In the case project exists, but current process definition still not created, we should also return the init version of it
if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
logger.error(msg);
throw new IllegalArgumentException(msg);
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
......
......@@ -282,14 +282,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
*/
@Override
public Queue createQueueIfNotExists(String queue, String queueName) {
Queue queueObj = new Queue(queueName, queue);
createQueueValid(queueObj);
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
if (Objects.isNull(existsQueue)) {
queueMapper.insert(queueObj);
return queueObj;
if (!Objects.isNull(existsQueue)) {
return existsQueue;
}
return existsQueue;
Queue queueObj = new Queue(queueName, queue);
createQueueValid(queueObj);
queueMapper.insert(queueObj);
return queueObj;
}
}
......@@ -366,8 +366,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
return tenantMapper.queryByTenantCode(tenantCode);
}
Queue newQueue = queueService.createQueueIfNotExists(queue, queueName);
Tenant tenant = new Tenant(tenantCode, desc, newQueue.getId());
Queue queueObj = queueService.createQueueIfNotExists(queue, queueName);
Tenant tenant = new Tenant(tenantCode, desc, queueObj.getId());
createTenantValid(tenant);
tenantMapper.insert(tenant);
return tenant;
......
......@@ -215,6 +215,21 @@ public class QueueServiceTest {
Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
}
@Test
public void testCreateQueueIfNotExists() {
Queue queue;
// queue exists
Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assert.assertEquals(getQUEUE(), queue);
// queue not exists
Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assert.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue);
}
/**
* create admin user
*/
......
......@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
......@@ -77,6 +78,9 @@ public class TenantServiceTest {
@InjectMocks
private TenantServiceImpl tenantService;
@Mock
private QueueService queueService;
@Mock
private TenantMapper tenantMapper;
......@@ -94,6 +98,8 @@ public class TenantServiceTest {
private static final String tenantCode = "hayden";
private static final String tenantDesc = "This is the tenant desc";
private static final String queue = "queue";
private static final String queueName = "queue_name";
@Test
public void testCreateTenant() throws Exception {
......@@ -229,6 +235,23 @@ public class TenantServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testCreateTenantIfNotExists() {
Tenant tenant;
// Tenant exists
Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(true);
Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant());
tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
Assert.assertEquals(getTenant(), tenant);
// Tenant not exists
Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(false);
Mockito.when(queueService.createQueueIfNotExists(queue, queueName)).thenReturn(getQueue());
tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
Assert.assertEquals(new Tenant(tenantCode, tenantDesc, getQueue().getId()), tenant);
}
/**
* get user
*/
......@@ -284,4 +307,10 @@ public class TenantServiceTest {
return processDefinitions;
}
private Queue getQueue() {
Queue queue = new Queue();
queue.setId(1);
return queue;
}
}
......@@ -41,6 +41,7 @@
and td.name = #{name}
and ptr.process_definition_code = #{processCode}
and td.code = ptr.post_task_code
limit 1
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
......
......@@ -64,8 +64,8 @@ class ProcessDefinition(Base):
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
:param resource_list: Resource files required by the current process definition.You can create and modify
resource files from this field. When the process definition is submitted, these resource files are also
submitted along with it.
resource files from this field. When the process definition is submitted, these resource files are
also submitted along with it.
"""
# key attribute for identify ProcessDefinition object
......
......@@ -17,8 +17,8 @@
"""Test whether success submit examples DAG to PythonGatewayService."""
import subprocess
from pathlib import Path
from subprocess import Popen
import pytest
......@@ -38,7 +38,19 @@ def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayService."""
try:
# Because our task decorator used module ``inspect`` to get the source, and it will
# raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
Popen(["python", str(example_path)])
except Exception:
raise Exception("Run example %s failed.", example_path.stem)
# raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
subprocess.check_call(["python", str(example_path)])
except subprocess.CalledProcessError:
raise RuntimeError("Run example %s failed.", example_path.stem)
def test_exec_multiple_times():
"""Test whether process definition can be executed more than one times."""
tutorial_path = path_example.joinpath("tutorial.py")
time = 0
while time < 3:
try:
subprocess.check_call(["python", str(tutorial_path)])
except subprocess.CalledProcessError:
raise RuntimeError("Run example %s failed.", tutorial_path.stem)
time += 1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册