提交 ca83fd73 编写于 作者: B break60
......@@ -48,6 +48,11 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Git fetch unshallow
run: |
git fetch --unshallow
git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"
git fetch origin
- name: Compile
run: |
export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
......@@ -56,11 +61,6 @@ jobs:
if: github.event_name == 'pull_request'
run: |
CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
- name: Git fetch unshallow
run: |
git fetch --unshallow
git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"
git fetch origin
- name: Run SonarCloud Analysis
run: >
mvn verify --batch-mode
......
* First from the remote repository *https://github.com/apache/incubator-dolphinscheduler.git* fork code to your own repository
* there are three branches in the remote repository currently:
* master normal delivery branch
After the stable version is released, the code for the stable version branch is merged into the master branch.
# Development
* dev daily development branch
The daily development branch, the newly submitted code can pull requests to this branch.
Start by forking the dolphinscheduler GitHub repository, make changes in a branch and then send a pull request.
## Set up your dolphinscheduler GitHub Repository
* Clone your own warehouse to your local
There are three branches in the remote repository currently:
- `master` : normal delivery branch. After the stable version is released, the code for the stable version branch is merged into the master branch.
- `dev` : daily development branch. The daily development branch, the newly submitted code can pull requests to this branch.
- `x.x.x-release` : the stable release version.
`git clone https://github.com/apache/incubator-dolphinscheduler.git`
So, you should fork the `dev` branch.
* Add remote repository address, named upstream
After forking the [dolphinscheduler upstream source repository](https://github.com/apache/incubator-dolphinscheduler/fork) to your personal repository, you can set your personal development environment.
`git remote add upstream https://github.com/apache/incubator-dolphinscheduler.git`
```sh
$ cd <your work direcotry>
$ git clone < your personal forked dolphinscheduler repo>
$ cd incubator-dolphinscheduler
```
* View repository:
## Set git remote as ``upstream``
`git remote -v`
Add remote repository address, named upstream
> There will be two repositories at this time: origin (your own warehouse) and upstream (remote repository)
```sh
git remote add upstream https://github.com/apache/incubator-dolphinscheduler.git
```
* Get/update remote repository code (already the latest code, skip it)
View repository:
`git fetch upstream`
```sh
git remote -v
```
There will be two repositories at this time: origin (your own warehouse) and upstream (remote repository)
* Synchronize remote repository code to local repository
Get/update remote repository code (already the latest code, skip it).
```sh
git fetch upstream
```
Synchronize remote repository code to local repository
```sh
git checkout origin/dev
git merge --no-ff upstream/dev
```
......@@ -41,24 +59,46 @@ git checkout -b dev-1.0 upstream/dev-1.0
git push --set-upstream origin dev1.0
```
* After modifying the code locally, submit it to your own repository:
## Create your feature branch
Before making code changes, make sure you create a separate branch for them.
```sh
$ git checkout -b <your-feature>
```
## Commit changes
After modifying the code locally, submit it to your own repository:
```sh
git commit -m 'information about your feature'
```
## Push to the branch
Push your locally committed changes to the remote origin (your fork).
`git commit -m 'test commit'`
`git push`
```
$ git push origin <your-feature>
```
## Create a pull request
* Submit changes to the remote repository
After submitting changes to your remote repository, you should click on the new pull request On the following github page.
* On the github page, click on the new pull request.
<p align = "center">
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/90f3abbf-70ef-4334-b8d6-9014c9cf4c7f.png"width ="60%"/>
</ p>
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/90f3abbf-70ef-4334-b8d6-9014c9cf4c7f.png" width ="60%"/>
</p>
Select the modified local branch and the branch to merge past to create a pull request.
* Select the modified local branch and the branch to merge past to create a pull request.
<p align = "center">
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/fe7eecfe-2720-4736-951b-b3387cf1ae41.png"width ="60%"/>
</ p>
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/fe7eecfe-2720-4736-951b-b3387cf1ae41.png" width ="60%"/>
</p>
* Next, the administrator is responsible for **merging** to complete the pull request
Next, the administrator is responsible for **merging** to complete the pull request.
......
......@@ -7,16 +7,16 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE_2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
from resource_management import *
from resource_management.core.logger import Logger
......
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2'
services:
zookeeper:
......
......@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Ignore;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,7 +33,6 @@ import java.util.*;
/**
*/
@Ignore
public class MailUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(MailUtilsTest.class);
@Test
......@@ -138,8 +137,10 @@ public class MailUtilsTest {
* Table
*/
@Test
public void addAlertTable(){
public void testAddAlertTable(){
logger.info("testAddAlertTable");
AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
Assert.assertNotNull(alertDao);
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
alert.setShowType(ShowType.TABLE);
......@@ -149,6 +150,7 @@ public class MailUtilsTest {
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(1);
alertDao.addAlert(alert);
logger.info("" +alert);
}
@Test
......
......@@ -93,11 +93,11 @@ public class AlertGroupController extends BaseController{
public Result list(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
logger.info("login user {}, query all alertGroup",
loginUser.getUserName());
try{
try {
HashMap<String, Object> result = alertGroupService.queryAlertgroup();
return returnDataList(result);
}catch (Exception e){
logger.error(Status.QUERY_ALL_ALERTGROUP_ERROR.getMsg(),e);
} catch (Exception e) {
logger.error(Status.QUERY_ALL_ALERTGROUP_ERROR.getMsg(), e);
return error(Status.QUERY_ALL_ALERTGROUP_ERROR.getCode(), Status.QUERY_ALL_ALERTGROUP_ERROR.getMsg());
}
}
......@@ -214,12 +214,20 @@ public class AlertGroupController extends BaseController{
@GetMapping(value = "/verify-group-name")
@ResponseStatus(HttpStatus.OK)
public Result verifyGroupName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="groupName") String groupName
) {
logger.info("login user {}, verfiy group name: {}",
loginUser.getUserName(),groupName);
@RequestParam(value ="groupName") String groupName) {
logger.info("login user {}, verify group name: {}", loginUser.getUserName(), groupName);
return alertGroupService.verifyGroupName(loginUser, groupName);
boolean exist= alertGroupService.existGroupName(groupName);
Result result = new Result();
if (exist) {
logger.error("group {} has exist, can't create again.", groupName);
result.setCode(Status.ALERT_GROUP_EXIST.getCode());
result.setMsg(Status.ALERT_GROUP_EXIST.getMsg());
} else {
result.setCode(Status.SUCCESS.getCode());
result.setMsg(Status.SUCCESS.getMsg());
}
return result;
}
/**
......
......@@ -144,7 +144,7 @@ public class ResourcesController extends BaseController{
@RequestParam(value ="type") ResourceType type
){
try{
logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString());
logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type);
Map<String, Object> result = resourceService.queryResourceList(loginUser, type);
return returnDataList(result);
}catch (Exception e){
......@@ -180,7 +180,7 @@ public class ResourcesController extends BaseController{
){
try{
logger.info("query resource list, login user:{}, resource type:{}, search value:{}",
loginUser.getUserName(), type.toString(), searchVal);
loginUser.getUserName(), type, searchVal);
Map<String, Object> result = checkPageParams(pageNo, pageSize);
if(result.get(Constants.STATUS) != Status.SUCCESS){
return returnDataListPaging(result);
......@@ -426,8 +426,6 @@ public class ResourcesController extends BaseController{
@RequestParam(value = "resourceId") int resourceId) {
logger.info("login user {}, create udf function, type: {}, funcName: {},argTypes: {} ,database: {},desc: {},resourceId: {}",
loginUser.getUserName(),type, funcName, argTypes,database,description, resourceId);
Result result = new Result();
try {
return udfFuncService.createUdfFunction(loginUser,funcName,className,argTypes,database,description,type,resourceId);
} catch (Exception e) {
......@@ -563,7 +561,7 @@ public class ResourcesController extends BaseController{
public Result queryResourceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("type") UdfType type){
try{
logger.info("query datasource list, user:{}, type:{}", loginUser.getUserName(), type.toString());
logger.info("query datasource list, user:{}, type:{}", loginUser.getUserName(), type);
Map<String, Object> result = udfFuncService.queryResourceList(loginUser,type.ordinal());
return returnDataList(result);
}catch (Exception e){
......
......@@ -16,17 +16,17 @@
*/
package org.apache.dolphinscheduler.api.service;
import java.util.*;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.UserAlertGroup;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.slf4j.Logger;
......@@ -35,11 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* alert group service
*/
......@@ -52,8 +47,7 @@ public class AlertGroupService extends BaseService{
private AlertGroupMapper alertGroupMapper;
@Autowired
private UserAlertGroupMapper userAlertGroupMapper;
private UserAlertGroupService userAlertGroupService;
/**
* query alert group list
*
......@@ -122,7 +116,7 @@ public class AlertGroupService extends BaseService{
alertGroup.setCreateTime(now);
alertGroup.setUpdateTime(now);
// insert
// insert
int insert = alertGroupMapper.insert(alertGroup);
if (insert > 0) {
......@@ -199,7 +193,7 @@ public class AlertGroupService extends BaseService{
return result;
}
userAlertGroupMapper.deleteByAlertgroupId(id);
userAlertGroupService.deleteByAlertGroupId(id);
alertGroupMapper.deleteById(id);
putMsg(result, Status.SUCCESS);
return result;
......@@ -223,22 +217,26 @@ public class AlertGroupService extends BaseService{
return result;
}
userAlertGroupMapper.deleteByAlertgroupId(alertgroupId);
userAlertGroupService.deleteByAlertGroupId(alertgroupId);
if (StringUtils.isEmpty(userIds)) {
putMsg(result, Status.SUCCESS);
return result;
}
String[] userIdsArr = userIds.split(",");
Date now = new Date();
List<UserAlertGroup> alertGroups = new ArrayList<>(userIds.length());
for (String userId : userIdsArr) {
Date now = new Date();
UserAlertGroup userAlertGroup = new UserAlertGroup();
userAlertGroup.setAlertgroupId(alertgroupId);
userAlertGroup.setUserId(Integer.parseInt(userId));
userAlertGroup.setCreateTime(now);
userAlertGroup.setUpdateTime(now);
userAlertGroupMapper.insert(userAlertGroup);
alertGroups.add(userAlertGroup);
}
if (CollectionUtils.isNotEmpty(alertGroups)) {
userAlertGroupService.saveBatch(alertGroups);
}
putMsg(result, Status.SUCCESS);
......@@ -248,22 +246,11 @@ public class AlertGroupService extends BaseService{
/**
* verify group name exists
*
* @param loginUser login user
* @param groupName group name
* @return check result code
*/
public Result verifyGroupName(User loginUser, String groupName) {
Result result = new Result();
public boolean existGroupName(String groupName) {
List<AlertGroup> alertGroup = alertGroupMapper.queryByGroupName(groupName);
if (alertGroup != null && alertGroup.size() > 0) {
logger.error("group {} has exist, can't create again.", groupName);
result.setCode(Status.ALERT_GROUP_EXIST.getCode());
result.setMsg(Status.ALERT_GROUP_EXIST.getMsg());
} else {
result.setCode(Status.SUCCESS.getCode());
result.setMsg(Status.SUCCESS.getMsg());
}
return result;
return CollectionUtils.isNotEmpty(alertGroup);
}
}
......@@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
......@@ -43,7 +44,7 @@ import java.util.Map;
@Service
public class QueueService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(TenantService.class);
private static final Logger logger = LoggerFactory.getLogger(QueueService.class);
@Autowired
private QueueMapper queueMapper;
......@@ -186,19 +187,16 @@ public class QueueService extends BaseService {
}
// check queue name is exist
if (!queueName.equals(queueObj.getQueueName())) {
if (checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
if (!queueName.equals(queueObj.getQueueName())
&& checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
// check queue value is exist
if (!queue.equals(queueObj.getQueue())) {
if (checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
// check old queue using by any user
......@@ -267,7 +265,7 @@ public class QueueService extends BaseService {
* @return true if the queue not exists, otherwise return false
*/
private boolean checkQueueExist(String queue) {
return queueMapper.queryAllQueueList(queue, null).size() > 0;
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null));
}
/**
......@@ -278,7 +276,7 @@ public class QueueService extends BaseService {
* @return true if the queue name not exists, otherwise return false
*/
private boolean checkQueueNameExist(String queueName) {
return queueMapper.queryAllQueueList(null, queueName).size() > 0;
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName));
}
/**
......@@ -290,7 +288,7 @@ public class QueueService extends BaseService {
* @return true if need to update user
*/
private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
return !oldQueue.equals(newQueue) && userMapper.queryUserListByQueue(oldQueue).size() > 0;
return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue));
}
}
......@@ -24,10 +24,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
......@@ -141,7 +138,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<String, Object>();
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
......@@ -171,12 +168,8 @@ public class ResourcesService extends BaseService {
* @return true if resource exists
*/
private boolean checkResourceExists(String alias, int userId, int type ){
List<Resource> resources = resourcesMapper.queryResourceList(alias, userId, type);
if (resources != null && resources.size() > 0) {
return true;
}
return false;
return CollectionUtils.isNotEmpty(resources);
}
......@@ -237,7 +230,7 @@ public class ResourcesService extends BaseService {
//get the file suffix
String originResourceName = resource.getAlias();
String suffix = originResourceName.substring(originResourceName.lastIndexOf("."));
String suffix = originResourceName.substring(originResourceName.lastIndexOf('.'));
//if the name without suffix then add it ,else use the origin name
String nameWithSuffix = name;
......@@ -746,8 +739,7 @@ public class ResourcesService extends BaseService {
logger.info("resource hdfs path is {} ", hdfsFileName);
HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true);
org.springframework.core.io.Resource file = org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
return file;
return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
}
......@@ -766,7 +758,7 @@ public class ResourcesService extends BaseService {
}
List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
List<Object> list ;
if (resourceList != null && resourceList.size() > 0) {
if (CollectionUtils.isNotEmpty(resourceList)) {
Set<Resource> resourceSet = new HashSet<>(resourceList);
List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
......@@ -801,7 +793,7 @@ public class ResourcesService extends BaseService {
List<UdfFunc> udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId);
List<UdfFunc> resultList = new ArrayList<>();
Set<UdfFunc> udfFuncSet = null;
if (udfFuncList != null && udfFuncList.size() > 0) {
if (CollectionUtils.isNotEmpty(udfFuncList)) {
udfFuncSet = new HashSet<>(udfFuncList);
List<UdfFunc> authedUDFFuncList = udfFunctionMapper.queryAuthedUdfFunc(userId);
......@@ -897,10 +889,9 @@ public class ResourcesService extends BaseService {
*/
private void getAuthorizedResourceList(Set<?> resourceSet, List<?> authedResourceList) {
Set<?> authedResourceSet = null;
if (authedResourceList != null && authedResourceList.size() > 0) {
if (CollectionUtils.isNotEmpty(authedResourceList)) {
authedResourceSet = new HashSet<>(authedResourceList);
resourceSet.removeAll(authedResourceSet);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.dolphinscheduler.dao.entity.UserAlertGroup;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
*/
@Service
public class UserAlertGroupService extends ServiceImpl<UserAlertGroupMapper, UserAlertGroup> {
@Autowired
private UserAlertGroupMapper userAlertGroupMapper;
boolean deleteByAlertGroupId(Integer groupId) {
return userAlertGroupMapper.deleteByAlertgroupId(groupId) >= 1;
}
}
......@@ -18,9 +18,12 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.UserType;
......@@ -31,9 +34,12 @@ import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import static org.mockito.ArgumentMatchers.*;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
......@@ -41,14 +47,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@RunWith(MockitoJUnitRunner.class)
public class AlertGroupServiceTest {
......@@ -60,6 +58,8 @@ public class AlertGroupServiceTest {
private AlertGroupMapper alertGroupMapper;
@Mock
private UserAlertGroupMapper userAlertGroupMapper;
@Mock
UserAlertGroupService userAlertGroupService;
private String groupName = "AlertGroupServiceTest";
......@@ -160,25 +160,34 @@ public class AlertGroupServiceTest {
}
@Test
public void testGrantUser(){
public void testGrantUser() {
Integer groupId = 1;
ArgumentCaptor<Integer> groupArgument = ArgumentCaptor.forClass(Integer.class);
Mockito.when(userAlertGroupService.deleteByAlertGroupId(anyInt())).thenReturn(true);
Map<String, Object> result = alertGroupService.grantUser(getLoginUser(), groupId, "123,321");
Mockito.verify(userAlertGroupService).deleteByAlertGroupId(groupArgument.capture());
Map<String, Object> result = alertGroupService.grantUser(getLoginUser(),1,"123,321");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
assertEquals(groupArgument.getValue(), groupId);
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testVerifyGroupName(){
public void testVerifyGroupName() {
//group name not exist
Result result = alertGroupService.verifyGroupName(getLoginUser(), groupName);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
boolean result = alertGroupService.existGroupName(groupName);
Assert.assertFalse(result);
Mockito.when(alertGroupMapper.queryByGroupName(groupName)).thenReturn(getList());
//group name exist
result = alertGroupService.verifyGroupName(getLoginUser(), groupName);
logger.info(result.toString());
Assert.assertEquals(Status.ALERT_GROUP_EXIST.getMsg(),result.getMsg());
result = alertGroupService.existGroupName(groupName);
Assert.assertTrue(result);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
*
*/
@RunWith(MockitoJUnitRunner.class)
public class UserAlertGroupServiceTest {
@InjectMocks
UserAlertGroupService userAlertGroupService;
@Mock
UserAlertGroupMapper userAlertGroupMapper;
@Test
public void deleteByAlertGroupId() {
Integer groupId = 1;
userAlertGroupService.deleteByAlertGroupId(groupId);
ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(userAlertGroupMapper).deleteByAlertgroupId(argumentCaptor.capture());
assertEquals(argumentCaptor.getValue(), groupId);
}
}
\ No newline at end of file
......@@ -211,6 +211,7 @@ public class CheckUtilsTest {
// DataxParameters
DataxParameters dataxParameters = new DataxParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString()));
dataxParameters.setCustomConfig(0);
dataxParameters.setDataSource(111);
dataxParameters.setDataTarget(333);
dataxParameters.setSql("sql");
......
......@@ -999,4 +999,11 @@ public final class Constants {
* dataSource sensitive param
*/
public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))";
/**
* new
* schedule time
*/
public static final String PARAMETER_SHECDULE_TIME = "schedule.time";
}
......@@ -27,6 +27,16 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
*/
public class DataxParameters extends AbstractParameters {
/**
* if custom json config,eg 0, 1
*/
private Integer customConfig;
/**
* if customConfig eq 1 ,then json is usable
*/
private String json;
/**
* data source type,eg MYSQL, POSTGRES ...
*/
......@@ -77,6 +87,22 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;
public Integer getCustomConfig() {
return customConfig;
}
public void setCustomConfig(Integer customConfig) {
this.customConfig = customConfig;
}
public String getJson() {
return json;
}
public void setJson(String json) {
this.json = json;
}
public String getDsType() {
return dsType;
}
......@@ -157,16 +183,18 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord;
}
@Override
public boolean checkParameters() {
if (!(dataSource != 0
&& dataTarget != 0
&& StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable))) {
return false;
if (customConfig == null) return false;
if (customConfig == 0) {
return dataSource != 0
&& dataTarget != 0
&& StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable);
} else {
return StringUtils.isNotEmpty(json);
}
return true;
}
@Override
......@@ -177,7 +205,9 @@ public class DataxParameters extends AbstractParameters {
@Override
public String toString() {
return "DataxParameters{" +
"dsType='" + dsType + '\'' +
"customConfig=" + customConfig +
", json='" + json + '\'' +
", dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
......
......@@ -78,6 +78,45 @@ public class ParameterUtils {
return parameterString;
}
/**
* new
* convert parameters place holders
*
* @param parameterString parameter
* @param parameterMap parameter map
* @return convert parameters place holders
*/
public static String convertParameterPlaceholders2(String parameterString, Map<String, String> parameterMap) {
if (StringUtils.isEmpty(parameterString)) {
return parameterString;
}
//Get current time, schedule execute time
String cronTimeStr = parameterMap.get(Constants.PARAMETER_SHECDULE_TIME);
Date cronTime = null;
if (StringUtils.isNotEmpty(cronTimeStr)) {
try {
cronTime = DateUtils.parseDate(cronTimeStr, new String[]{Constants.PARAMETER_FORMAT_TIME});
} catch (ParseException e) {
logger.error(String.format("parse %s exception", cronTimeStr), e);
}
} else {
cronTime = new Date();
}
// replace variable ${} form,refers to the replacement of system variables and custom variables
parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true);
// replace time $[...] form, eg. $[yyyyMMdd]
if (cronTime != null) {
parameterString = TimePlaceholderUtils.replacePlaceholders(parameterString, cronTime, true);
}
return parameterString;
}
/**
* set in parameter
* @param index index
......@@ -173,4 +212,44 @@ public class ParameterUtils {
}
return inputString;
}
/**
* new
* $[yyyyMMdd] replace scheduler time
* @param text
* @param paramsMap
* @return
*/
public static String replaceScheduleTime(String text, Date scheduleTime, Map<String, Property> paramsMap) {
if (paramsMap != null) {
//if getScheduleTime null ,is current date
if (null == scheduleTime) {
scheduleTime = new Date();
}
String dateTime = org.apache.dolphinscheduler.common.utils.DateUtils.format(scheduleTime, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
text = ParameterUtils.convertParameterPlaceholders2(text, convert(paramsMap));
}
return text;
}
/**
* format convert
* @param paramsMap params map
* @return Map of converted
* see org.apache.dolphinscheduler.server.utils.ParamUtils.convert
*/
public static Map<String,String> convert(Map<String,Property> paramsMap){
Map<String,String> map = new HashMap<>();
Iterator<Map.Entry<String, Property>> iter = paramsMap.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<String, Property> en = iter.next();
map.put(en.getKey(),en.getValue().getValue());
}
return map;
}
}
......@@ -71,7 +71,7 @@ public class PropertyUtils {
*
* @return judge whether resource upload startup
*/
public static Boolean getResUploadStartupState(){
public static boolean getResUploadStartupState(){
String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3;
......
......@@ -50,8 +50,8 @@ public class AlertDao extends AbstractBaseDao {
@Override
protected void init() {
alertMapper = ConnectionFactory.getMapper(AlertMapper.class);
userAlertGroupMapper = ConnectionFactory.getMapper(UserAlertGroupMapper.class);
alertMapper = ConnectionFactory.getInstance().getMapper(AlertMapper.class);
userAlertGroupMapper = ConnectionFactory.getInstance().getMapper(UserAlertGroupMapper.class);
}
/**
......@@ -99,13 +99,7 @@ public class AlertDao extends AbstractBaseDao {
String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]",
serverType, host);
alert.setTitle("Fault tolerance warning");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(alertgroupId);
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
alertMapper.insert(alert);
saveTaskTimeoutAlert(alert, content, alertgroupId, null, null);
}
/**
......@@ -121,6 +115,11 @@ public class AlertDao extends AbstractBaseDao {
String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
processInstance.getId(), processInstance.getName());
alert.setTitle("Process Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
private void saveTaskTimeoutAlert(Alert alert, String content, int alertgroupId,
String receivers, String receiversCc){
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
......@@ -150,19 +149,7 @@ public class AlertDao extends AbstractBaseDao {
String content = String.format("[{'process instance id':'%d','task name':'%s','task id':'%d','task name':'%s'," +
"'event':'timeout','warnLevel':'middle'}]", processInstanceId, processInstanceName, taskId, taskName);
alert.setTitle("Task Timeout Warn");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(alertgroupId);
if (StringUtils.isNotEmpty(receivers)) {
alert.setReceivers(receivers);
}
if (StringUtils.isNotEmpty(receiversCc)) {
alert.setReceiversCc(receiversCc);
}
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
alertMapper.insert(alert);
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
/**
......
......@@ -34,29 +34,47 @@ import javax.sql.DataSource;
/**
* not spring manager connection, only use for init db, and alert module for non-spring application
* not spring manager connection, only use for init db, and alert module for non-spring application
* data source connection factory
*/
public class ConnectionFactory extends SpringConnectionFactory{
public class ConnectionFactory extends SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
private static class ConnectionFactoryHolder {
private static final ConnectionFactory connectionFactory = new ConnectionFactory();
}
public static ConnectionFactory getInstance() {
return ConnectionFactoryHolder.connectionFactory;
}
private ConnectionFactory() {
try {
sqlSessionFactory = getSqlSessionFactory();
sqlSessionTemplate = getSqlSessionTemplate();
} catch (Exception e) {
logger.error("Initializing ConnectionFactory error", e);
throw new RuntimeException(e);
}
}
/**
* sql session factory
*/
private static SqlSessionFactory sqlSessionFactory;
private SqlSessionFactory sqlSessionFactory;
/**
* sql session template
*/
private static SqlSessionTemplate sqlSessionTemplate;
private SqlSessionTemplate sqlSessionTemplate;
/**
* get the data source
*
* @return druid dataSource
*/
public static DruidDataSource getDataSource() {
public DruidDataSource getDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
......@@ -89,65 +107,54 @@ public class ConnectionFactory extends SpringConnectionFactory{
/**
* * get sql session factory
*
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
public static SqlSessionFactory getSqlSessionFactory() throws Exception {
if (sqlSessionFactory == null) {
synchronized (ConnectionFactory.class) {
if (sqlSessionFactory == null) {
DataSource dataSource = getDataSource();
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSource);
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setEnvironment(environment);
configuration.setLazyLoadingEnabled(true);
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(new PaginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactory = sqlSessionFactoryBean.getObject();
}
}
}
private SqlSessionFactory getSqlSessionFactory() throws Exception {
DataSource dataSource = getDataSource();
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSource);
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setEnvironment(environment);
configuration.setLazyLoadingEnabled(true);
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(new PaginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactory = sqlSessionFactoryBean.getObject();
return sqlSessionFactory;
}
private SqlSessionTemplate getSqlSessionTemplate() {
sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
return sqlSessionTemplate;
}
/**
* get sql session
*
* @return sqlSession
*/
public static SqlSession getSqlSession() {
if (sqlSessionTemplate == null) {
synchronized (ConnectionFactory.class) {
if (sqlSessionTemplate == null) {
try {
sqlSessionTemplate = new SqlSessionTemplate(getSqlSessionFactory());
return sqlSessionTemplate;
} catch (Exception e) {
logger.error("getSqlSession error", e);
throw new RuntimeException(e);
}
}
}
}
public SqlSession getSqlSession() {
return sqlSessionTemplate;
}
/**
* get mapper
*
* @param type target class
* @param <T> generic
* @param <T> generic
* @return target object
*/
public static <T> T getMapper(Class<T> type) {
public <T> T getMapper(Class<T> type) {
try {
return getSqlSession().getMapper(type);
} catch (Exception e) {
......
......@@ -53,7 +53,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
* @return DruidDataSource
*/
public static DruidDataSource getDataSource(){
DruidDataSource dataSource = ConnectionFactory.getDataSource();
DruidDataSource dataSource = ConnectionFactory.getInstance().getDataSource();
dataSource.setInitialSize(2);
dataSource.setMinIdle(2);
dataSource.setMaxActive(2);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertDaoTest {
private static final Logger logger = LoggerFactory.getLogger(AlertDaoTest.class);
@Test
public void testGetAlertDao() {
logger.info("testGetAlertDao start");
AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
Assert.assertNotNull(alertDao);
logger.info("testGetAlertDao end");
}
}
......@@ -31,7 +31,7 @@ public class ConnectionFactoryTest {
*/
@Test
public void testConnection()throws Exception{
Connection connection = ConnectionFactory.getDataSource().getPooledConnection().getConnection();
Connection connection = ConnectionFactory.getInstance().getDataSource().getPooledConnection().getConnection();
Assert.assertTrue(connection != null);
}
}
\ No newline at end of file
......@@ -64,7 +64,7 @@ public class QueueMapperTest {
queue.setCreateTime(new Date());
//update
int update = queueMapper.updateById(queue);
Assert.assertEquals(update, 1);
Assert.assertEquals(1, update);
queueMapper.deleteById(queue.getId());
}
......@@ -75,7 +75,7 @@ public class QueueMapperTest {
public void testDelete(){
Queue queue = insertOne();
int delete = queueMapper.deleteById(queue.getId());
Assert.assertEquals(delete, 1);
Assert.assertEquals(1, delete);
}
/**
......
......@@ -138,7 +138,7 @@ public class ResourceMapperTest {
resource.setCreateTime(new Date());
//update
int update = resourceMapper.updateById(resource);
Assert.assertEquals(update, 1);
Assert.assertEquals(1, update);
resourceMapper.deleteById(resource.getId());
}
......@@ -149,7 +149,7 @@ public class ResourceMapperTest {
public void testDelete(){
Resource resourceMap = insertOne();
int delete = resourceMapper.deleteById(resourceMap.getId());
Assert.assertEquals(delete, 1);
Assert.assertEquals(1, delete);
}
/**
......@@ -236,8 +236,8 @@ public class ResourceMapperTest {
resourceUserMapper.deleteById(resourcesUser.getId());
resourceMapper.deleteById(resource.getId());
Assert.assertEquals(resources.size(), 0);
Assert.assertNotEquals(resources1.size(), 0);
Assert.assertEquals(0, resources.size());
Assert.assertNotEquals(0, resources1.size());
}
......@@ -251,7 +251,7 @@ public class ResourceMapperTest {
List<Resource> resources = resourceMapper.queryAuthorizedResourceList(resource.getUserId());
resourceMapper.deleteById(resource.getId());
Assert.assertEquals(resources.size(), 0);
Assert.assertEquals(0, resources.size());
}
/**
......@@ -293,7 +293,7 @@ public class ResourceMapperTest {
);
Assert.assertEquals(resource1, "ut tenant code for resource");
Assert.assertEquals("ut tenant code for resource", resource1);
resourceMapper.deleteById(resource.getId());
}
......
......@@ -62,7 +62,7 @@ public class ResourceUserMapperTest {
queue.setCreateTime(new Date());
//update
int update = resourceUserMapper.updateById(queue);
Assert.assertEquals(update, 1);
Assert.assertEquals(1, update);
resourceUserMapper.deleteById(queue.getId());
}
......@@ -73,7 +73,7 @@ public class ResourceUserMapperTest {
public void testDelete(){
ResourcesUser queue = insertOne();
int delete = resourceUserMapper.deleteById(queue.getId());
Assert.assertEquals(delete, 1);
Assert.assertEquals(1, delete);
}
/**
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
......
......@@ -301,7 +301,7 @@ public class NettyRemotingClient {
return channel;
}
} catch (Exception ex) {
logger.info("connect to {} error {}", address, ex);
logger.error("connect to {} error", address, ex);
}
return null;
}
......
......@@ -32,9 +32,9 @@ import java.util.concurrent.*;
*/
public class ResponseFuture {
private final static Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class);
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
private static final ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/**
* request unique identification
......@@ -63,11 +63,11 @@ public class ResponseFuture {
/**
* response command
*/
private volatile Command responseCommand;
private Command responseCommand;
private volatile boolean sendOk = true;
private volatile Throwable cause;
private Throwable cause;
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) {
this.opaque = opaque;
......
......@@ -126,14 +126,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
new Object[]{ch, config.getWriteBufferHighWaterMark()});
ch, config.getWriteBufferHighWaterMark());
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
new Object[]{ch, config.getWriteBufferLowWaterMark()});
ch, config.getWriteBufferLowWaterMark());
}
config.setAutoRead(true);
}
......
......@@ -117,7 +117,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
try {
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error : {}", msg, ex);
logger.error("process msg {} error", msg, ex);
}
}
};
......@@ -158,14 +158,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) {
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}",
new Object[]{ch, config.getWriteBufferHighWaterMark()});
ch, config.getWriteBufferHighWaterMark());
}
config.setAutoRead(false);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}",
new Object[]{ch, config.getWriteBufferLowWaterMark()});
ch, config.getWriteBufferLowWaterMark());
}
config.setAutoRead(true);
}
......
......@@ -52,8 +52,8 @@ public class NamedThreadFactory implements ThreadFactory {
*/
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
: String.format(name + "_%d", increment.getAndIncrement());
final String threadName = count > 0 ? String.format("%s_%d_%d", name, count, increment.getAndIncrement())
: String.format("%s_%d", name, increment.getAndIncrement());
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
......
......@@ -674,9 +674,9 @@ public class MasterExecThread implements Runnable {
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
if(forbiddenTaskList.containsKey(depsNode) ||
skipTaskNodeList.containsKey(depsNode)){
if(!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){
continue;
}
// dependencies must be fully completed
......
......@@ -28,7 +28,7 @@ import org.springframework.context.annotation.ComponentScan;
@ComponentScan("org.apache.dolphinscheduler")
public class RemoveZKNode implements CommandLineRunner {
private static Integer ARGS_LENGTH = 1;
private static final Integer ARGS_LENGTH = 1;
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
......
......@@ -192,24 +192,47 @@ public class DataxTask extends AbstractTask {
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId());
String json;
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
logger.debug("datax job json : {}", root.toString());
if (dataXParameters.getCustomConfig() == 1){
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
}
}else {
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
json = root.toString();
}
logger.debug("datax job json : {}", json);
// create datax json file
FileUtils.writeStringToFile(new File(fileName), root.toString(), StandardCharsets.UTF_8);
FileUtils.writeStringToFile(new File(fileName), json, StandardCharsets.UTF_8);
return fileName;
}
......
......@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
......@@ -142,11 +143,21 @@ public class ShellTask extends AbstractTask {
shellParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
// if (paramsMap != null){
// script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
// }
//new
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (paramsMap != null) {
String dateTime = DateUtils.format(taskProps.getScheduleTime(), Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
script = ParameterUtils.convertParameterPlaceholders2(script, ParamUtils.convert(paramsMap));
}
shellParameters.setRawScript(script);
logger.info("raw script : {}", shellParameters.getRawScript());
......
......@@ -222,7 +222,9 @@ public class SqlTask extends AbstractTask {
logger.info("SQL title : {}",title);
sqlParameters.setTitle(title);
}
//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskProps.getScheduleTime(), paramsMap);
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
......@@ -341,10 +343,10 @@ public class SqlTask extends AbstractTask {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} finally {
try {
connection.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
try {
connection.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return connection;
......
......@@ -44,6 +44,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
/**
* DataxTask Tester.
*/
......@@ -59,6 +61,8 @@ public class DataxTaskTest {
private ApplicationContext applicationContext;
private TaskProps props = new TaskProps();
@Before
public void before()
throws Exception {
......@@ -70,7 +74,6 @@ public class DataxTaskTest {
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
......@@ -78,10 +81,8 @@ public class DataxTaskTest {
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
props.setCmdTypeIfComplement(START_PROCESS);
setTaskParems(0);
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
......@@ -91,6 +92,22 @@ public class DataxTaskTest {
Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0);
}
private void setTaskParems(Integer customConfig) {
if (customConfig == 1) {
props.setTaskParams(
"{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
} else {
props.setTaskParams(
"{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
}
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
......@@ -102,7 +119,7 @@ public class DataxTaskTest {
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setCommandType(START_PROCESS);
processInstance.setScheduleTime(new Date());
return processInstance;
}
......@@ -229,18 +246,24 @@ public class DataxTaskTest {
*/
@Test
public void testBuildDataxJsonFile()
throws Exception {
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null);
Assert.assertNotNull(filePath);
}
catch (Exception e) {
setTaskParems(1);
buildDataJson();
setTaskParems(0);
buildDataJson();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
public void buildDataJson() throws Exception {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null);
Assert.assertNotNull(filePath);
}
/**
* Method: buildDataxJobContentJson()
*/
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
......
......@@ -70,9 +70,10 @@ public class QuartzExecutors {
synchronized (QuartzExecutors.class) {
// when more than two threads run into the first null check same time, to avoid instanced more than one time, it needs to be checked again.
if (INSTANCE == null) {
INSTANCE = new QuartzExecutors();
QuartzExecutors quartzExecutors = new QuartzExecutors();
//finish QuartzExecutors init
INSTANCE.init();
quartzExecutors.init();
INSTANCE = quartzExecutors;
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* project external config
*/
export default {
// task record switch
recordSwitch:false
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export default ["test@analysys.com.cn"]
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
......
<?xml version="1.0" encoding="utf-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<suite name="dolphinscheduler_e2e" parallel="true">
<test name="dolphinscheduler_test" preserve-order="true">
......
......@@ -707,6 +707,7 @@
<include>**/api/service/TenantServiceTest.java</include>
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/AlertGroupServiceTest.java</include>
<include>**/api/service/UserAlertGroupServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
......@@ -740,6 +741,8 @@
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/alert/utils/MailUtilsTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
</includes>
<!-- <skip>true</skip> -->
</configuration>
......@@ -775,25 +778,23 @@
<artifactId>apache-rat-plugin</artifactId>
<version>${apache.rat.version}</version>
<configuration>
<includes>
<include>**/*.java</include>
<include>**/dolphinscheduler-ui/src/**/*.scss</include>
<include>**/dolphinscheduler-ui/src/**/*.css</include>
<include>**/dolphinscheduler-ui/src/**/*.vue</include>
<include>**/dolphinscheduler-ui/src/**/*.js</include>
<include>**/dolphinscheduler-ui/src/**/*.html</include>
</includes>
<excludes>
<exclude>**/dolphinscheduler-ui/src/lib/**</exclude>
<exclude>**/dolphinscheduler-ui/src/js/module/util/cookie.js</exclude>
<exclude>**/dolphinscheduler-ui/src/font/awesome/font-awesome.css</exclude>
<exclude>**/dolphinscheduler-ui/src/sass/common/_animation.scss</exclude>
<exclude>**/dolphinscheduler-ui/src/sass/common/_normalize.scss</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/es5.js</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/base.css</exclude>
<exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude>
<exclude>**/node_modules/**</exclude>
<exclude>**/node/**</exclude>
<exclude>**/dist/**</exclude>
<exclude>**/licenses/**</exclude>
<exclude>**/src/sass/common/_animation.scss</exclude>
<exclude>**/src/sass/common/_normalize.scss</exclude>
<exclude>.github/**</exclude>
<exclude>sql/soft_version</exclude>
<exclude>**/*.json</exclude>
<!-- document files -->
<exclude>**/*.md</exclude>
<excldue>**/*.MD</excldue>
<exclude>**/*.txt</exclude>
<exclude>**/docs/**</exclude>
<exclude>**/*.babelrc</exclude>
<exclude>**/*.eslintrc</exclude>
</excludes>
<consoleOutput>true</consoleOutput>
</configuration>
......@@ -885,4 +886,4 @@
<module>dolphinscheduler-service</module>
</modules>
</project>
\ No newline at end of file
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册