未验证 提交 191773d6 编写于 作者: W Wenjun Ruan 提交者: GitHub

Merge pull request #45 from ruanwenjun/dev_wenjun_fixInsertPluginErrorDuetoConcurrentInsert

Fix insertOrUpdate plugin may failed due to concurrent operation
......@@ -17,16 +17,21 @@
package org.apache.dolphinscheduler.dao;
import static java.util.Objects.requireNonNull;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static java.util.Objects.requireNonNull;
@Component
public class PluginDao {
private final Logger logger = LoggerFactory.getLogger(PluginDao.class);
@Autowired
private PluginDefineMapper pluginDefineMapper;
......@@ -51,10 +56,22 @@ public class PluginDao {
PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
if (currPluginDefine == null) {
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
return pluginDefine.getId();
try {
if (pluginDefineMapper.insert(pluginDefine) == 1) {
return pluginDefine.getId();
} else {
throw new TaskPluginException(String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", pluginDefine.getPluginName(), pluginDefine.getPluginType()));
}
} catch (TaskPluginException ex) {
throw ex;
} catch (Exception ex) {
logger.info("Insert plugin definition error, there may already exist a plugin");
currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
if (currPluginDefine == null) {
throw new TaskPluginException(String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", pluginDefine.getPluginName(), pluginDefine.getPluginType()));
}
throw new IllegalStateException("Failed to insert plugin definition");
}
throw new IllegalStateException("Failed to insert plugin definition");
}
if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
......
......@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.service.task;
import static java.lang.String.format;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
......@@ -29,6 +27,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashSet;
......@@ -39,9 +40,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import static java.lang.String.format;
// todo: make this class to be utils
@Component
......@@ -106,10 +105,7 @@ public class TaskPluginManager {
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new TaskPluginException("Failed to update task plugin: " + name);
}
pluginDao.addOrUpdatePluginDefine(pluginDefine);
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册