diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java index 0c1dc20ac9fdf241e0d05c808bb6c715b1ddd035..30c8cdc7b9798ac6006f16f37ede6cdad1d5af8b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java @@ -34,6 +34,7 @@ import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; @@ -45,7 +46,7 @@ import static org.junit.Assert.*; @RunWith(SpringRunner.class) @SpringBootTest @Transactional -@Rollback(true) +@Rollback public class AccessTokenMapperTest { @Autowired @@ -56,10 +57,11 @@ public class AccessTokenMapperTest { /** * test insert + * * @throws Exception */ @Test - public void testInsert() throws Exception{ + public void testInsert() throws Exception { Integer userId = 1; AccessToken accessToken = createAccessToken(userId); @@ -69,10 +71,11 @@ public class AccessTokenMapperTest { /** * test select by id + * * @throws Exception */ @Test - public void testSelectById() throws Exception{ + public void testSelectById() throws Exception { Integer userId = 1; AccessToken accessToken = createAccessToken(userId); AccessToken resultAccessToken = accessTokenMapper.selectById(accessToken.getId()); @@ -81,6 +84,7 @@ public class AccessTokenMapperTest { /** * test hashCode method + * * @throws Exception */ @Test @@ -94,6 +98,7 @@ public class AccessTokenMapperTest { /** * test equals method + * * @throws Exception */ @Test @@ -108,7 +113,7 @@ public class AccessTokenMapperTest { * test page */ @Test - public void testSelectAccessTokenPage() throws Exception{ + public void testSelectAccessTokenPage() throws Exception { Integer count = 4; String userName = "zhangsan"; @@ -120,11 +125,11 @@ public class AccessTokenMapperTest { Page page = new Page(offset, size); IPage accessTokenPage = accessTokenMapper.selectAccessTokenPage(page, userName, 0); - assertEquals(Integer.valueOf(accessTokenPage.getRecords().size()),size); + assertEquals(Integer.valueOf(accessTokenPage.getRecords().size()), size); - for (AccessToken accessToken : accessTokenPage.getRecords()){ + for (AccessToken accessToken : accessTokenPage.getRecords()) { AccessToken resultAccessToken = accessTokenMap.get(accessToken.getId()); - assertEquals(accessToken,resultAccessToken); + assertEquals(accessToken, resultAccessToken); } } @@ -133,14 +138,17 @@ public class AccessTokenMapperTest { * test update */ @Test - public void testUpdate() throws Exception{ + public void testUpdate() throws Exception { Integer userId = 1; AccessToken accessToken = createAccessToken(userId); //update accessToken.setToken("56789"); accessToken.setExpireTime(DateUtils.getCurrentDate()); accessToken.setUpdateTime(DateUtils.getCurrentDate()); - accessTokenMapper.updateById(accessToken); + int status = accessTokenMapper.updateById(accessToken); + if (status != 1) { + Assert.fail("update access token fail"); + } AccessToken resultAccessToken = accessTokenMapper.selectById(accessToken.getId()); assertEquals(accessToken, resultAccessToken); } @@ -149,11 +157,14 @@ public class AccessTokenMapperTest { * test delete */ @Test - public void testDelete() throws Exception{ + public void testDelete() throws Exception { Integer userId = 1; AccessToken accessToken = createAccessToken(userId); - accessTokenMapper.deleteById(accessToken.getId()); + int status = accessTokenMapper.deleteById(accessToken.getId()); + if (status != 1) { + Assert.fail("delete access token data fail"); + } AccessToken resultAccessToken = accessTokenMapper.selectById(accessToken.getId()); @@ -163,21 +174,22 @@ public class AccessTokenMapperTest { /** * create accessTokens - * @param count create accessToken count + * + * @param count create accessToken count * @param userName username * @return accessToken map * @throws Exception */ - private Map createAccessTokens( - Integer count,String userName) throws Exception{ + private Map createAccessTokens( + Integer count, String userName) throws Exception { User user = createUser(userName); - Map accessTokenMap = new HashMap<>(); - for (int i = 1 ; i<= count ; i++){ - AccessToken accessToken = createAccessToken(user.getId(),userName); + Map accessTokenMap = new HashMap<>(); + for (int i = 1; i <= count; i++) { + AccessToken accessToken = createAccessToken(user.getId(), userName); - accessTokenMap.put(accessToken.getId(),accessToken); + accessTokenMap.put(accessToken.getId(), accessToken); } return accessTokenMap; @@ -185,11 +197,12 @@ public class AccessTokenMapperTest { /** * create user + * * @param userName userName * @return user * @throws Exception */ - private User createUser(String userName) throws Exception{ + private User createUser(String userName) throws Exception { User user = new User(); user.setUserName(userName); user.setUserPassword("123"); @@ -201,42 +214,50 @@ public class AccessTokenMapperTest { user.setUpdateTime(DateUtils.getCurrentDate()); user.setQueue("default"); - userMapper.insert(user); + int status = userMapper.insert(user); + + if (status != 1) { + Assert.fail("insert user data error"); + } return user; } /** * create access token - * @param userId userId + * + * @param userId userId * @param userName userName * @return accessToken * @throws Exception */ - private AccessToken createAccessToken(Integer userId,String userName)throws Exception{ - Random random = new Random(); + private AccessToken createAccessToken(Integer userId, String userName) throws Exception { //insertOne AccessToken accessToken = new AccessToken(); accessToken.setUserName(userName); accessToken.setUserId(userId); - accessToken.setToken(String.valueOf(random.nextLong())); + accessToken.setToken(String.valueOf(ThreadLocalRandom.current().nextLong())); accessToken.setCreateTime(DateUtils.getCurrentDate()); accessToken.setUpdateTime(DateUtils.getCurrentDate()); accessToken.setExpireTime(DateUtils.getCurrentDate()); - accessTokenMapper.insert(accessToken); + int status = accessTokenMapper.insert(accessToken); + if (status != 1) { + Assert.fail("insert data error"); + } return accessToken; } /** * create access token + * * @param userId userId * @return accessToken * @throws Exception */ - private AccessToken createAccessToken(Integer userId)throws Exception{ - return createAccessToken(userId,null); + private AccessToken createAccessToken(Integer userId) throws Exception { + return createAccessToken(userId, null); } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index 818f88fb493b9ac2f5e005f7e167819c36a0367b..76741a7db9e387e146d1d56e15c0efd5502f97c5 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -65,9 +65,10 @@ public class ResourceMapperTest { /** * insert + * * @return Resource */ - private Resource insertOne(){ + private Resource insertOne() { //insertOne Resource resource = new Resource(); resource.setAlias("ut-resource"); @@ -76,16 +77,20 @@ public class ResourceMapperTest { resource.setDirectory(false); resource.setType(ResourceType.FILE); resource.setUserId(111); - resourceMapper.insert(resource); + int status = resourceMapper.insert(resource); + if (status != 1) { + Assert.fail("insert data error"); + } return resource; } /** * create resource by user + * * @param user user * @return Resource */ - private Resource createResource(User user,boolean isDirectory,ResourceType resourceType,int pid,String alias,String fullName){ + private Resource createResource(User user, boolean isDirectory, ResourceType resourceType, int pid, String alias, String fullName) { //insertOne Resource resource = new Resource(); resource.setDirectory(isDirectory); @@ -93,19 +98,23 @@ public class ResourceMapperTest { resource.setAlias(alias); resource.setFullName(fullName); resource.setUserId(user.getId()); - resourceMapper.insert(resource); + int status = resourceMapper.insert(resource); + if (status != 1) { + Assert.fail("insert data error"); + } return resource; } /** * create resource by user + * * @param user user * @return Resource */ - private Resource createResource(User user){ + private Resource createResource(User user) { //insertOne - String alias = String.format("ut-resource-%s",user.getUserName()); - String fullName = String.format("/%s",alias); + String alias = String.format("ut-resource-%s", user.getUserName()); + String fullName = String.format("/%s", alias); Resource resource = createResource(user, false, ResourceType.FILE, -1, alias, fullName); return resource; @@ -113,9 +122,10 @@ public class ResourceMapperTest { /** * create user + * * @return User */ - private User createGeneralUser(String userName){ + private User createGeneralUser(String userName) { User user = new User(); user.setUserName(userName); user.setUserPassword("1"); @@ -124,15 +134,20 @@ public class ResourceMapperTest { user.setCreateTime(new Date()); user.setTenantId(1); user.setUpdateTime(new Date()); - userMapper.insert(user); + int status = userMapper.insert(user); + + if (status != 1) { + Assert.fail("insert data error"); + } return user; } /** * create resource user + * * @return ResourcesUser */ - private ResourcesUser createResourcesUser(Resource resource,User user){ + private ResourcesUser createResourcesUser(Resource resource, User user) { //insertOne ResourcesUser resourcesUser = new ResourcesUser(); resourcesUser.setCreateTime(new Date()); @@ -145,16 +160,17 @@ public class ResourceMapperTest { } @Test - public void testInsert(){ + public void testInsert() { Resource resource = insertOne(); assertNotNull(resource.getId()); - assertThat(resource.getId(),greaterThan(0)); + assertThat(resource.getId(), greaterThan(0)); } + /** * test update */ @Test - public void testUpdate(){ + public void testUpdate() { //insertOne Resource resource = insertOne(); resource.setCreateTime(new Date()); @@ -167,7 +183,7 @@ public class ResourceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { Resource resourceMap = insertOne(); int delete = resourceMapper.deleteById(resourceMap.getId()); Assert.assertEquals(1, delete); @@ -294,19 +310,31 @@ public class ResourceMapperTest { Tenant tenant = new Tenant(); tenant.setTenantName("ut tenant "); tenant.setTenantCode("ut tenant code for resource"); - tenantMapper.insert(tenant); + int tenantInsertStatus = tenantMapper.insert(tenant); + + if (tenantInsertStatus != 1) { + Assert.fail("insert tenant data error"); + } User user = new User(); user.setTenantId(tenant.getId()); user.setUserName("ut user"); - userMapper.insert(user); + int userInsertStatus = userMapper.insert(user); + + if (userInsertStatus != 1) { + Assert.fail("insert user data error"); + } + Resource resource = insertOne(); resource.setUserId(user.getId()); - resourceMapper.updateById(resource); + int userUpdateStatus = resourceMapper.updateById(resource); + if (userUpdateStatus != 1) { + Assert.fail("update user data error"); + } String resource1 = resourceMapper.queryTenantCodeByResourceName( - resource.getFullName(),ResourceType.FILE.ordinal() + resource.getFullName(), ResourceType.FILE.ordinal() ); @@ -315,7 +343,7 @@ public class ResourceMapperTest { } @Test - public void testListAuthorizedResource(){ + public void testListAuthorizedResource() { // create a general user User generalUser1 = createGeneralUser("user1"); User generalUser2 = createGeneralUser("user2"); @@ -328,20 +356,19 @@ public class ResourceMapperTest { List resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); - Assert.assertEquals(generalUser2.getId(),resource.getUserId()); + Assert.assertEquals(generalUser2.getId(), resource.getUserId()); Assert.assertFalse(resources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames))); - // authorize object unauthorizedResource to generalUser - createResourcesUser(unauthorizedResource,generalUser2); + createResourcesUser(unauthorizedResource, generalUser2); List authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); Assert.assertTrue(authorizedResources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames))); } @Test - public void deleteIdsTest(){ + public void deleteIdsTest() { // create a general user User generalUser1 = createGeneralUser("user1"); @@ -352,11 +379,11 @@ public class ResourceMapperTest { resourceList.add(resource.getId()); resourceList.add(resource1.getId()); int result = resourceMapper.deleteIds(resourceList.toArray(new Integer[resourceList.size()])); - Assert.assertEquals(result,2); + Assert.assertEquals(result, 2); } @Test - public void queryResourceListAuthoredTest(){ + public void queryResourceListAuthoredTest() { // create a general user User generalUser1 = createGeneralUser("user1"); User generalUser2 = createGeneralUser("user2"); @@ -372,16 +399,18 @@ public class ResourceMapperTest { } @Test - public void batchUpdateResourceTest(){ + public void batchUpdateResourceTest() { // create a general user User generalUser1 = createGeneralUser("user1"); // create resource Resource resource = createResource(generalUser1); - resource.setFullName(String.format("%s-update",resource.getFullName())); + resource.setFullName(String.format("%s-update", resource.getFullName())); resource.setUpdateTime(new Date()); List resourceList = new ArrayList<>(); resourceList.add(resource); int result = resourceMapper.batchUpdateResource(resourceList); - Assert.assertTrue(result>0); + if (result != resourceList.size()) { + Assert.fail("batch update resource data error"); + } } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java index 918ed6764bfc1df66e44c784e44a5b874777e502..e7d89240710a93a74bf3916dd0a2d7020fab5192 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * task fulture @@ -55,11 +56,11 @@ public class TaskFuture { /** * response command */ - private volatile Command responseCommand; + private AtomicReference responseCommandReference = new AtomicReference<>(); private volatile boolean sendOk = true; - private volatile Throwable cause; + private AtomicReference causeReference; public TaskFuture(long opaque, long timeoutMillis) { this.opaque = opaque; @@ -74,7 +75,7 @@ public class TaskFuture { */ public Command waitResponse() throws InterruptedException { this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); - return this.responseCommand; + return this.responseCommandReference.get(); } /** @@ -83,7 +84,7 @@ public class TaskFuture { * @param responseCommand responseCommand */ public void putResponse(final Command responseCommand) { - this.responseCommand = responseCommand; + responseCommandReference.set(responseCommand); this.latch.countDown(); FUTURE_TABLE.remove(opaque); } @@ -114,11 +115,11 @@ public class TaskFuture { } public void setCause(Throwable cause) { - this.cause = cause; + causeReference.set(cause); } public Throwable getCause() { - return cause; + return causeReference.get(); } public long getOpaque() { @@ -134,11 +135,11 @@ public class TaskFuture { } public Command getResponseCommand() { - return responseCommand; + return responseCommandReference.get(); } public void setResponseCommand(Command responseCommand) { - this.responseCommand = responseCommand; + responseCommandReference.set(responseCommand); } @@ -166,9 +167,9 @@ public class TaskFuture { ", timeoutMillis=" + timeoutMillis + ", latch=" + latch + ", beginTimestamp=" + beginTimestamp + - ", responseCommand=" + responseCommand + + ", responseCommand=" + responseCommandReference.get() + ", sendOk=" + sendOk + - ", cause=" + cause + + ", cause=" + causeReference.get() + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 7abb31b31c44947ffc8cf50ee05d3bbb55d18ce2..ba07313a9acc4929de3fcdaa49bd66a78327f6f3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -108,6 +108,7 @@ public class TaskResponseService { TaskResponseEvent taskResponseEvent = eventQueue.take(); persist(taskResponseEvent); } catch (InterruptedException e){ + Thread.currentThread().interrupt(); break; } catch (Exception e){ logger.error("persist task error",e); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java index 01d7b2f1e5957bf56b30506c9e9eca18fe6dd195..3b15810e057b6c592d90cf64fe5b00a4497d4ae5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java @@ -61,20 +61,20 @@ public class QuartzExecutors { */ private static Scheduler scheduler; - /** - * instance of QuartzExecutors - */ - private static volatile QuartzExecutors INSTANCE = null; - /** * load conf */ private static Configuration conf; + private static final class Holder { + private static final QuartzExecutors instance = new QuartzExecutors(); + } + private QuartzExecutors() { try { conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH); + init(); }catch (ConfigurationException e){ logger.warn("not loaded quartz configuration file, will used default value",e); } @@ -85,18 +85,7 @@ public class QuartzExecutors { * @return instance of Quartz Executors */ public static QuartzExecutors getInstance() { - if (INSTANCE == null) { - synchronized (QuartzExecutors.class) { - // when more than two threads run into the first null check same time, to avoid instanced more than one time, it needs to be checked again. - if (INSTANCE == null) { - QuartzExecutors quartzExecutors = new QuartzExecutors(); - //finish QuartzExecutors init - quartzExecutors.init(); - INSTANCE = quartzExecutors; - } - } - } - return INSTANCE; + return Holder.instance; }