提交 e6f14cf3 编写于 作者: L ligang

Initial module escheduler-common commit

上级 ae32da32
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId>
<version>1.0.0</version>
</parent>
<artifactId>escheduler-common</artifactId>
<name>escheduler-common</name>
<url>http://maven.apache.org</url>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.joshelser</groupId>
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common;
/**
* server stop interface.
*/
public interface IStoppable {
/**
* Stop this service.
* @param cause why stopping
*/
public void stop(String cause);
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* alert status
*/
public enum AlertStatus {
/**
* 0 waiting executed; 1 execute successfully,2 execute failed
*/
WAIT_EXECUTION,EXECUTION_SUCCESS,EXECUTION_FAILURE
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* warning message notification method
*/
public enum AlertType {
/**
* 0 email; 1 SMS
*/
EMAIL,SMS
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* command types
*/
public enum CommandType {
/**
* command types
* 0 start a new process
* 1 start a new process from current nodes
* 2 recover tolerance fault work flow
* 3 start process from paused task nodes
* 4 start process from failure task nodes
* 5 complement data
* 6 start a new process from scheduler
* 7 repeat running a work flow
* 8 pause a process
* 9 stop a process
* 10 recover waiting thread
*/
START_PROCESS, START_CURRENT_TASK_PROCESS, RECOVER_TOLERANCE_FAULT_PROCESS, RECOVER_SUSPENDED_PROCESS,
START_FAILURE_TASK_PROCESS,COMPLEMENT_DATA,SCHEDULER, REPEAT_RUNNING,PAUSE,STOP,RECOVER_WAITTING_THREAD;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* cycle enums
*/
public enum CycleEnum {
/**
* 0 minute; 1 hour; 2 day; 3 week; 4 month; 5 year;
*/
MINUTE, HOUR, DAY, WEEK, MONTH, YEAR
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* data types in user define parameter
*/
public enum DataType {
/**
* 0 string
* 1 integer
* 2 long
* 3 float
* 4 double
* 5 date, "YYYY-MM-DD"
* 6 time, "HH:MM:SS"
* 7 time stamp
* 8 Boolean
*/
VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* data base types
*/
public enum DbType {
/**
* 0 mysql
* 1 postgresql
* 2 hive
* 3 spark
*/
MYSQL, POSTGRESQL, HIVE, SPARK
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* depend result
*/
public enum DependResult {
/**
* 0 success
* 1 waiting
* 2 failed
*/
SUCCESS, WAITING, FAILED
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* depend strategy
*/
public enum DependStrategy {
/**
* 0 none,1 all success 2 all failed 3 one success 4 one failed
*/
NONE, ALL_SUCCESS, ALL_FAILED, ONE_SUCCESS, ONE_FAILED
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* dependent relation: and or
*/
public enum DependentRelation {
AND,OR;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* parameter of stored procedure
*/
public enum Direct {
/**
* 0 in; 1 out;
*/
IN,OUT
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* runing status for work flow and task nodes
*
*/
public enum ExecutionStatus {
/**
* status:
* 0 submit success
* 1 running
* 2 ready pause
* 3 pause
* 4 ready stop
* 5 stop
* 6 failure
* 7 success
* 8 need fault tolerance
* 9 kill
* 10 waiting thread
* 11 waiting depend node complete
*/
SUBMITTED_SUCCESS,RUNNING_EXEUTION,READY_PAUSE,PAUSE,READY_STOP,STOP,FAILURE,SUCCESS,
NEED_FAULT_TOLERANCE,KILL,WAITTING_THREAD,WAITTING_DEPEND;
/**
* status is success
* @return
*/
public boolean typeIsSuccess(){
return this == SUCCESS;
}
/**
* status is failure
* @return
*/
public boolean typeIsFailure(){
return this == FAILURE || this == NEED_FAULT_TOLERANCE;
}
/**
* status is finished
* @return
*/
public boolean typeIsFinished(){
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsWaittingThread();
}
/**
* status is waiting thread
* @return
*/
public boolean typeIsWaittingThread(){
return this == WAITTING_THREAD;
}
/**
* status is pause
* @return
*/
public boolean typeIsPause(){
return this == PAUSE;
}
/**
* status is running
* @return
*/
public boolean typeIsRunning(){
return this == RUNNING_EXEUTION || this == WAITTING_DEPEND;
}
/**
* status is cancel
*/
public boolean typeIsCancel(){ return this == KILL || this == STOP ;}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* failure policy when some task node failed.
*/
public enum FailureStrategy {
/**
* 0 ending process when some tasks failed.
* 1 continue running when some tasks failed.
**/
END, CONTINUE;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* have_script
* have_file
* can_retry
* have_arr_variables
* have_map_variables
* have_alert
*/
public enum Flag {
/**
* 0 no
* 1 yes
*/
NO,YES
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* define process and task priority
*/
public enum Priority {
/**
* 0 highest priority
* 1 higher priority
* 2 medium priority
* 3 lower priority
* 4 lowest priority
*/
HIGHEST,HIGH,MEDIUM,LOW,LOWEST
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* support program types
*/
public enum ProgramType {
/**
* 0 JAVA,1 SCALA,2 PYTHON
*/
JAVA,
SCALA,
PYTHON
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* process define release state
*/
public enum ReleaseState {
/**
* 0 offline
* 1 on line
*/
OFFLINE,ONLINE;
public static ReleaseState getEnum(int value){
for (ReleaseState e:ReleaseState.values()) {
if(e.ordinal() == value) {
return e;
}
}
//For values out of enum scope
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* resource type
*/
public enum ResourceType {
/**
* 0 file, 1 udf
*/
FILE,UDF
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* complement data run mode
*/
public enum RunMode {
/**
* 0 serial run
* 1 parallel run
* */
RUN_MODE_SERIAL, RUN_MODE_PARALLEL
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* self depency strategy
*/
public enum SelfDependStrategy {
/**
* 0 donot depend the last cycle;
* 1 depend the last cycle
**/
NO_DEP_PRE, DEP_PRE
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* show type for email
*/
public enum ShowType {
/**
* 0 TABLE;
* 1 TEXT;
* 2 attachment;
* 3 TABLE+attachment;
*/
TABLE,
TEXT,
ATTACHMENT,
TABLEATTACHMENT
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task node depend type
*/
public enum TaskDependType {
/**
* 0 run current tasks only
* 1 run current tasks and previous tasks
* 2 run current tasks and the other tasks that depend on current tasks;
*/
TASK_ONLY, TASK_PRE, TASK_POST;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* type of task state
*/
public enum TaskStateType {
/**
* 0 waiting running
* 1 running
* 2 finish
* 3 failed
* 4 success
*/
WAITTING, RUNNING, FINISH, FAILED, SUCCESS;
/**
* convert task state to execute status integer array ;
* @param taskStateType
* @return
*/
public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType){
switch (taskStateType){
case SUCCESS:
return new int[]{ExecutionStatus.SUCCESS.ordinal()};
case FAILED:
return new int[]{
ExecutionStatus.FAILURE.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()};
case FINISH:
return new int[]{
ExecutionStatus.PAUSE.ordinal(),
ExecutionStatus.STOP.ordinal()
};
case RUNNING:
return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
case WAITTING:
return new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal()
};
default:
break;
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task timeout strategy
*/
public enum TaskTimeoutStrategy {
/**
* 0 warn
* 1 failed
* 2 warn+failed
*/
WARN, FAILED, WARNFAILED
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task node type
*/
public enum TaskType {
/**
* 0 SHELL
* 1 SQL
* 2 SUB_PROCESS
* 3 PROCEDURE
* 4 MR
* 5 SPARK
* 6 PYTHON
* 7 DEPENDENT
*/
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* UDF type
*/
public enum UdfType {
/**
* 0 hive; 1 spark
*/
HIVE, SPARK
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* user type
*/
public enum UserType {
/**
* 0 admin user; 1 general user
*/
ADMIN_USER,
GENERAL_USER
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* types for whether to send warning when process ending;
*/
public enum WarningType {
/**
* 0 do not send warning;
* 1 send if process success;
* 2 send if process failed;
* 3 send if process ending;
*/
NONE, SUCCESS, FAILURE, ALL;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.graph;
import cn.escheduler.common.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* analysis of DAG
* Node: node
* NodeInfo:node description information
* EdgeInfo: edge description information
*/
public class DAG<Node, NodeInfo, EdgeInfo> {
private static final Logger logger = LoggerFactory.getLogger(DAG.class);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* node map, key is node, value is node information
*/
private volatile Map<Node, NodeInfo> nodesMap;
/**
* edge map. key is node of origin;value is Map with key for destination node and value for edge
*/
private volatile Map<Node, Map<Node, EdgeInfo>> edgesMap;
/**
* reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
*/
private volatile Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
public DAG() {
nodesMap = new HashMap<>();
edgesMap = new HashMap<>();
reverseEdgesMap = new HashMap<>();
}
/**
* add node information
*
* @param node node
* @param nodeInfo node information
*/
public void addNode(Node node, NodeInfo nodeInfo) {
lock.writeLock().lock();
try{
nodesMap.put(node, nodeInfo);
}finally {
lock.writeLock().unlock();
}
}
/**
* add edge
* @param fromNode node of origin
* @param toNode node of destination
* @return The result of adding an edge. returns false if the DAG result is a ring result
*/
public boolean addEdge(Node fromNode, Node toNode) {
return addEdge(fromNode, toNode, false);
}
/**
* add edge
* @param fromNode node of origin
* @param toNode node of destination
* @param createNode whether the node needs to be created if it does not exist
* @return The result of adding an edge. returns false if the DAG result is a ring result
*/
private boolean addEdge(Node fromNode, Node toNode, boolean createNode) {
return addEdge(fromNode, toNode, null, createNode);
}
/**
* add edge
*
* @param fromNode node of origin
* @param toNode node of destination
* @param edge edge description
* @param createNode whether the node needs to be created if it does not exist
* @return The result of adding an edge. returns false if the DAG result is a ring result
*/
public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
lock.writeLock().lock();
try{
// Whether an edge can be successfully added(fromNode -> toNode)
if (!isLegalAddEdge(fromNode, toNode, createNode)) {
logger.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
return false;
}
addNodeIfAbsent(fromNode, null);
addNodeIfAbsent(toNode, null);
addEdge(fromNode, toNode, edge, edgesMap);
addEdge(toNode, fromNode, edge, reverseEdgesMap);
return true;
}finally {
lock.writeLock().unlock();
}
}
/**
* whether this node is contained
*
* @param node node
* @return
*/
public boolean containsNode(Node node) {
lock.readLock().lock();
try{
return nodesMap.containsKey(node);
}finally {
lock.readLock().unlock();
}
}
/**
* whether this edge is contained
*
* @param fromNode node of origin
* @param toNode node of destination
* @return
*/
public boolean containsEdge(Node fromNode, Node toNode) {
lock.readLock().lock();
try{
Map<Node, EdgeInfo> endEdges = edgesMap.get(fromNode);
if (endEdges == null) {
return false;
}
return endEdges.containsKey(toNode);
}finally {
lock.readLock().unlock();
}
}
/**
* get node description
*
* @param node node
* @return
*/
public NodeInfo getNode(Node node) {
lock.readLock().lock();
try{
return nodesMap.get(node);
}finally {
lock.readLock().unlock();
}
}
/**
* Get the number of nodes
*
* @return
*/
public int getNodesCount() {
lock.readLock().lock();
try{
return nodesMap.size();
}finally {
lock.readLock().unlock();
}
}
/**
* Get the number of edges
*
* @return
*/
public int getEdgesCount() {
lock.readLock().lock();
try{
int count = 0;
for (Map.Entry<Node, Map<Node, EdgeInfo>> entry : edgesMap.entrySet()) {
count += entry.getValue().size();
}
return count;
}finally {
lock.readLock().unlock();
}
}
/**
* get the start node of DAG
*
* @return
*/
public Collection<Node> getBeginNode() {
lock.readLock().lock();
try{
return CollectionUtils.subtract(nodesMap.keySet(), reverseEdgesMap.keySet());
}finally {
lock.readLock().unlock();
}
}
/**
* get the end node of DAG
*
* @return
*/
public Collection<Node> getEndNode() {
lock.readLock().lock();
try{
return CollectionUtils.subtract(nodesMap.keySet(), edgesMap.keySet());
}finally {
lock.readLock().unlock();
}
}
/**
* Gets all previous nodes of the node
*
* @param node node id to be calculated
* @return
*/
public Set<Node> getPreviousNodes(Node node) {
lock.readLock().lock();
try{
return getNeighborNodes(node, reverseEdgesMap);
}finally {
lock.readLock().unlock();
}
}
/**
* Get all subsequent nodes of the node
*
* @param node node id to be calculated
* @return
*/
public Set<Node> getSubsequentNodes(Node node) {
lock.readLock().lock();
try{
return getNeighborNodes(node, edgesMap);
}finally {
lock.readLock().unlock();
}
}
/**
* Gets the degree of entry of the node
*
* @param node node id
* @return
*/
public int getIndegree(Node node) {
lock.readLock().lock();
try{
return getPreviousNodes(node).size();
}finally {
lock.readLock().unlock();
}
}
/**
* whether the graph has a ring
*
* @return true if has cycle, else return false.
*/
public boolean hasCycle() {
lock.readLock().lock();
try{
return !topologicalSortImpl().getKey();
}finally {
lock.readLock().unlock();
}
}
/**
* Only DAG has a topological sort
* @return topologically sorted results, returns false if the DAG result is a ring result
* @throws Exception
*/
public List<Node> topologicalSort() throws Exception {
lock.readLock().lock();
try{
Map.Entry<Boolean, List<Node>> entry = topologicalSortImpl();
if (entry.getKey()) {
return entry.getValue();
}
throw new Exception("serious error: graph has cycle ! ");
}finally {
lock.readLock().unlock();
}
}
/**
* if tho node does not exist,add this node
*
* @param node node
* @param nodeInfo node information
*/
private void addNodeIfAbsent(Node node, NodeInfo nodeInfo) {
if (!containsNode(node)) {
addNode(node, nodeInfo);
}
}
/**
* add edge
*
* @param fromNode node of origin
* @param toNode node of destination
* @param edge edge description
* @param edges edge set
*/
private void addEdge(Node fromNode, Node toNode, EdgeInfo edge, Map<Node, Map<Node, EdgeInfo>> edges) {
edges.putIfAbsent(fromNode, new HashMap<>());
Map<Node, EdgeInfo> toNodeEdges = edges.get(fromNode);
toNodeEdges.put(toNode, edge);
}
/**
* Whether an edge can be successfully added(fromNode -> toNode)
* need to determine whether the DAG has cycle
*
* @param fromNode node of origin
* @param toNode node of destination
* @param createNode whether to create a node
* @return
*/
private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
if (fromNode.equals(toNode)) {
logger.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
return false;
}
if (!createNode) {
if (!containsNode(fromNode) || !containsNode(toNode)){
logger.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
return false;
}
}
// Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the DAG has cycle!
int verticesCount = getNodesCount();
Queue<Node> queue = new LinkedList<>();
queue.add(toNode);
// if DAG doesn't find fromNode, it's not has cycle!
while (!queue.isEmpty() && (--verticesCount > 0)) {
Node key = queue.poll();
for (Node subsequentNode : getSubsequentNodes(key)) {
if (subsequentNode.equals(fromNode)) {
return false;
}
queue.add(subsequentNode);
}
}
return true;
}
/**
* Get all neighbor nodes of the node
*
* @param node Node id to be calculated
* @param edges neighbor edge information
* @return
*/
private Set<Node> getNeighborNodes(Node node, final Map<Node, Map<Node, EdgeInfo>> edges) {
final Map<Node, EdgeInfo> neighborEdges = edges.get(node);
if (neighborEdges == null) {
return Collections.EMPTY_MAP.keySet();
}
return neighborEdges.keySet();
}
/**
* Determine whether there are ring and topological sorting results
*
* Directed acyclic graph (DAG) has topological ordering
* Breadth First Search:
* 1、Traversal of all the vertices in the graph, the degree of entry is 0 vertex into the queue
* 2、Poll a vertex in the queue to update its adjacency (minus 1) and queue the adjacency if it is 0 after minus 1
* 3、Do step 2 until the queue is empty
* If you cannot traverse all the nodes, it means that the current graph is not a directed acyclic graph.
* There is no topological sort.
*
*
* @return key Returns the state
* if success (acyclic) is true, failure (acyclic) is looped,
* and value (possibly one of the topological sort results)
*/
private Map.Entry<Boolean, List<Node>> topologicalSortImpl() {
// node queue with degree of entry 0
Queue<Node> zeroIndegreeNodeQueue = new LinkedList<>();
// save result
List<Node> topoResultList = new ArrayList<>();
// save the node whose degree is not 0
Map<Node, Integer> notZeroIndegreeNodeMap = new HashMap<>();
// Scan all the vertices and push vertexs with an entry degree of 0 to queue
for (Map.Entry<Node, NodeInfo> vertices : nodesMap.entrySet()) {
Node node = vertices.getKey();
int inDegree = getIndegree(node);
if (inDegree == 0) {
zeroIndegreeNodeQueue.add(node);
topoResultList.add(node);
} else {
notZeroIndegreeNodeMap.put(node, inDegree);
}
}
/**
* After scanning, there is no node with 0 degree of entry,
* indicating that there is a ring, and return directly
*/
if(zeroIndegreeNodeQueue.isEmpty()){
return new AbstractMap.SimpleEntry(false, topoResultList);
}
// The topology algorithm is used to delete nodes with 0 degree of entry and its associated edges
while (!zeroIndegreeNodeQueue.isEmpty()) {
Node v = zeroIndegreeNodeQueue.poll();
// Get the neighbor node
Set<Node> subsequentNodes = getSubsequentNodes(v);
for (Node subsequentNode : subsequentNodes) {
Integer degree = notZeroIndegreeNodeMap.get(subsequentNode);
if(--degree == 0){
topoResultList.add(subsequentNode);
zeroIndegreeNodeQueue.add(subsequentNode);
notZeroIndegreeNodeMap.remove(subsequentNode);
}else{
notZeroIndegreeNodeMap.put(subsequentNode, degree);
}
}
}
// if notZeroIndegreeNodeMap is empty,there is no ring!
AbstractMap.SimpleEntry resultMap = new AbstractMap.SimpleEntry(notZeroIndegreeNodeMap.size() == 0 , topoResultList);
return resultMap;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
/**
* data source base class
*/
public abstract class BaseDataSource {
/**
* user name
*/
private String user;
/**
* user password
*/
private String password;
/**
* data source address
*/
private String address;
/**
* database name
*/
private String database;
/**
* other connection parameters for the data source
*/
private String other;
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
public abstract void isConnectable() throws Exception;
/**
* gets the JDBC url for the data source connection
* @return
*/
public abstract String getJdbcUrl();
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getOther() {
return other;
}
public void setOther(String other) {
this.other = other;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.utils.JSONUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* produce datasource in this custom defined datasource factory.
*/
public class DataSourceFactory {
private static final Logger logger = LoggerFactory.getLogger(DataSourceFactory.class);
public static BaseDataSource getDatasource(DbType dbType, String parameter) {
try {
switch (dbType) {
case MYSQL:
return JSONUtils.parseObject(parameter, MySQLDataSource.class);
case POSTGRESQL:
return JSONUtils.parseObject(parameter, PostgreDataSource.class);
case HIVE:
return JSONUtils.parseObject(parameter, HiveDataSource.class);
case SPARK:
return JSONUtils.parseObject(parameter, SparkDataSource.class);
default:
return null;
}
} catch (Exception e) {
logger.error("Get datasource object error", e);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of hive
*/
public class HiveDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(HiveDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Postgre datasource try conn close conn error", e);
throw e;
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of mySQL
*/
public class MySQLDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(MySQLDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String address = getAddress();
if (address.lastIndexOf("/") != (address.length() - 1)) {
address += "/";
}
String jdbcUrl = address + getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Mysql datasource try conn close conn error", e);
throw e;
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of postgreSQL
*/
public class PostgreDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(PostgreDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.postgresql.Driver");
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Postgre datasource try conn close conn error", e);
throw e;
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.job.db;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of spark
*/
public class SparkDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(SparkDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Spark datasource try conn close conn error", e);
throw e;
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.model;
import java.util.Date;
/**
* date interval class
*/
public class DateInterval {
private Date startTime;
private Date endTime;
public DateInterval(Date beginTime, Date endTime){
this.startTime = beginTime;
this.endTime = endTime;
}
@Override
public boolean equals(Object obj) {
try{
DateInterval dateInterval = (DateInterval) obj;
return startTime.equals(dateInterval.getStartTime()) &&
endTime.equals(dateInterval.getEndTime());
}catch (Exception e){
return false;
}
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.model;
import cn.escheduler.common.enums.DependResult;
/**
* dependent item
*/
public class DependentItem {
private int definitionId;
private String depTasks;
private String cycle;
private String dateValue;
private DependResult dependResult;
public String getKey(){
return String.format("%d-%s-%s-%s",
getDefinitionId(),
getDepTasks(),
getCycle(),
getDateValue());
}
public int getDefinitionId() {
return definitionId;
}
public void setDefinitionId(int definitionId) {
this.definitionId = definitionId;
}
public String getDepTasks() {
return depTasks;
}
public void setDepTasks(String depTasks) {
this.depTasks = depTasks;
}
public String getCycle() {
return cycle;
}
public void setCycle(String cycle) {
this.cycle = cycle;
}
public String getDateValue() {
return dateValue;
}
public void setDateValue(String dateValue) {
this.dateValue = dateValue;
}
public DependResult getDependResult() {
return dependResult;
}
public void setDependResult(DependResult dependResult) {
this.dependResult = dependResult;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.model;
import cn.escheduler.common.enums.DependentRelation;
import java.util.List;
public class DependentTaskModel {
private List<DependentItem> dependItemList;
private DependentRelation relation;
public List<DependentItem> getDependItemList() {
return dependItemList;
}
public void setDependItemList(List<DependentItem> dependItemList) {
this.dependItemList = dependItemList;
}
public DependentRelation getRelation() {
return relation;
}
public void setRelation(DependentRelation relation) {
this.relation = relation;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.model;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.Priority;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
import cn.escheduler.common.task.TaskTimeoutParameter;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.JSONUtils;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class TaskNode {
/**
* task node id
*/
private String id;
/**
* task node name
*/
private String name;
/**
* task node description
*/
private String desc;
/**
* task node type
*/
private String type;
/**
* the run flag has two states, NORMAL or FORBIDDEN
*/
private String runFlag;
/**
* the front field
*/
private String loc;
/**
* maximum number of retries
*/
private int maxRetryTimes;
/**
* Unit of retry interval: points
*/
private int retryInterval;
/**
* params information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String params;
/**
* inner dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String preTasks;
/**
* users store additional information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String extras;
/**
* node dependency list
*/
private List<String> depList;
/**
* outer dependency information
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String dependence;
/**
* task instance priority
*/
private Priority taskInstancePriority;
/**
* task time out
*/
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String timeout;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getPreTasks() {
return preTasks;
}
public void setPreTasks(String preTasks) throws IOException {
this.preTasks = preTasks;
this.depList = JSONUtils.toList(preTasks, String.class);
}
public String getExtras() {
return extras;
}
public void setExtras(String extras) {
this.extras = extras;
}
public List<String> getDepList() {
return depList;
}
public void setDepList(List<String> depList) throws JsonProcessingException {
this.depList = depList;
this.preTasks = JSONUtils.toJson(depList);
}
public String getLoc() {
return loc;
}
public void setLoc(String loc) {
this.loc = loc;
}
public String getRunFlag(){
return runFlag;
}
public void setRunFlag(String runFlag) {
this.runFlag = runFlag;
}
public Boolean isForbidden(){
return (StringUtils.isNotEmpty(this.runFlag) &&
this.runFlag.equals(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskNode taskNode = (TaskNode) o;
return Objects.equals(name, taskNode.name) &&
Objects.equals(desc, taskNode.desc) &&
Objects.equals(type, taskNode.type) &&
Objects.equals(params, taskNode.params) &&
Objects.equals(preTasks, taskNode.preTasks) &&
Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) &&
CollectionUtils.equalLists(depList, taskNode.depList);
}
@Override
public int hashCode() {
return Objects.hash(name, desc, type, params, preTasks, extras, depList, runFlag);
}
public String getDependence() {
return dependence;
}
public void setDependence(String dependence) {
this.dependence = dependence;
}
public int getMaxRetryTimes() {
return maxRetryTimes;
}
public void setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
}
public int getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}
public Priority getTaskInstancePriority() {
return taskInstancePriority;
}
public void setTaskInstancePriority(Priority taskInstancePriority) {
this.taskInstancePriority = taskInstancePriority;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
/**
* get task time out parameter
* @return
*/
public TaskTimeoutParameter getTaskTimeoutParameter() {
if(StringUtils.isNotEmpty(this.getTimeout())){
String formatStr = String.format("%s,%s", TaskTimeoutStrategy.WARN.name(), TaskTimeoutStrategy.FAILED.name());
String timeout = this.getTimeout().replace(formatStr,TaskTimeoutStrategy.WARNFAILED.name());
return JSONObject.parseObject(timeout,TaskTimeoutParameter.class);
}
return new TaskTimeoutParameter(false);
}
@Override
public String toString() {
return "TaskNode{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", desc='" + desc + '\'' +
", type='" + type + '\'' +
", runFlag='" + runFlag + '\'' +
", loc='" + loc + '\'' +
", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval +
", params='" + params + '\'' +
", preTasks='" + preTasks + '\'' +
", extras='" + extras + '\'' +
", depList=" + depList +
", dependence='" + dependence + '\'' +
", taskInstancePriority=" + taskInstancePriority +
", timeout='" + timeout + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.model;
public class TaskNodeRelation {
/**
* task start node name
*/
private String startNode;
/**
* task end node name
*/
private String endNode;
public TaskNodeRelation() {
}
public TaskNodeRelation(String startNode, String endNode) {
this.startNode = startNode;
this.endNode = endNode;
}
public String getStartNode() {
return startNode;
}
public void setStartNode(String startNode) {
this.startNode = startNode;
}
public String getEndNode() {
return endNode;
}
public void setEndNode(String endNode) {
this.endNode = endNode;
}
public boolean equals(TaskNodeRelation e){
return (e.getStartNode() == this.startNode && e.getEndNode() == this.endNode);
}
@Override
public String toString() {
return "TaskNodeRelation{" +
"startNode='" + startNode + '\'' +
", endNode='" + endNode + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.process;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import java.util.List;
public class ProcessDag {
/**
* DAG edge list
**/
private List<TaskNodeRelation> edges;
/**
* DAG node list
*/
private List<TaskNode> nodes;
/**
* getter method
*
* @return the edges
* @see ProcessDag#edges
*/
public List<TaskNodeRelation> getEdges() {
return edges;
}
/**
* setter method
*
* @param edges the edges to set
* @see ProcessDag#edges
*/
public void setEdges(List<TaskNodeRelation> edges) {
this.edges = edges;
}
/**
* getter method
*
* @return the nodes
* @see ProcessDag#nodes
*/
public List<TaskNode> getNodes() {
return nodes;
}
/**
* setter method
*
* @param nodes the nodes to set
* @see ProcessDag#nodes
*/
public void setNodes(List<TaskNode> nodes) {
this.nodes = nodes;
}
@Override
public String toString() {
return "ProcessDag{" +
"edges=" + edges +
", nodes=" + nodes +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.process;
import cn.escheduler.common.enums.DataType;
import cn.escheduler.common.enums.Direct;
import java.util.Objects;
public class Property {
/**
* key
*/
private String prop;
/**
* input/output
*/
private Direct direct;
/**
* data type
*/
private DataType type;
/**
* value
*/
private String value;
public Property() {
}
public Property(String prop,Direct direct,DataType type,String value) {
this.prop = prop;
this.direct = direct;
this.type = type;
this.value = value;
}
/**
* getter method
*
* @return the prop
* @see Property#prop
*/
public String getProp() {
return prop;
}
/**
* setter method
*
* @param prop the prop to set
* @see Property#prop
*/
public void setProp(String prop) {
this.prop = prop;
}
/**
* getter method
*
* @return the value
* @see Property#value
*/
public String getValue() {
return value;
}
/**
* setter method
*
* @param value the value to set
* @see Property#value
*/
public void setValue(String value) {
this.value = value;
}
public Direct getDirect() {
return direct;
}
public void setDirect(Direct direct) {
this.direct = direct;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Property property = (Property) o;
return Objects.equals(prop, property.prop) &&
Objects.equals(value, property.value);
}
@Override
public int hashCode() {
return Objects.hash(prop, value);
}
@Override
public String toString() {
return "Property{" +
"prop='" + prop + '\'' +
", direct=" + direct +
", type=" + type +
", value='" + value + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.process;
/**
* resource info
*/
public class ResourceInfo {
/**
* res the name of the resource that was uploaded
*/
private String res;
public String getRes() {
return res;
}
public void setRes(String res) {
this.res = res;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.queue;
import java.util.List;
import java.util.Set;
public interface ITaskQueue {
/**
* take out all the elements
*
* this method has deprecated
* use checkTaskExists instead
*
* @param key
* @return
*/
@Deprecated
List<String> getAllTasks(String key);
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @return true if exists in the queue
*/
boolean checkTaskExists(String key, String task);
/**
* add an element to the queue
*
* @param key queue name
* @param value
*/
void add(String key, String value);
/**
* an element pops out of the queue
*
* @param key queue name
* @return
*/
String poll(String key);
/**
* add an element to the set
*
* @param key
* @param value
*/
void sadd(String key, String value);
/**
* delete the value corresponding to the key in the set
*
* @param key
* @param value
*/
void srem(String key, String value);
/**
* gets all the elements of the set based on the key
*
* @param key
* @return
*/
Set<String> smembers(String key);
/**
* clear the task queue for use by junit tests only
*/
void delete();
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.queue;
import cn.escheduler.common.utils.CommonUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task queue factory
*/
public class TaskQueueFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueFactory.class);
private TaskQueueFactory(){
}
/**
* get instance (singleton)
*
* @return instance
*/
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
// queueImplValue = StringUtils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids ");
// return TaskQueueRedisImpl.getInstance();
// } else {
logger.info("task queue impl use zookeeper ");
return TaskQueueZkImpl.getInstance();
// }
}else{
logger.error("property escheduler.queue.impl can't be blank ");
System.exit(-1);
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implemention
*/
public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private static TaskQueueZkImpl instance;
private TaskQueueZkImpl(){
init();
}
public static TaskQueueZkImpl getInstance(){
if (null == instance) {
synchronized (TaskQueueZkImpl.class) {
if(null == instance) {
instance = new TaskQueueZkImpl();
}
}
}
return instance;
}
/**
* get all tasks from tasks queue
* @param key task queue name
* @return
*/
@Deprecated
@Override
public List<String> getAllTasks(String key) {
try {
List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));
return list;
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return new ArrayList<String>();
}
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @return true if exists in the queue
*/
@Override
public boolean checkTaskExists(String key, String task) {
String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
try {
Stat stat = zkClient.checkExists().forPath(taskPath);
if(null == stat){
logger.info("check task:{} not exist in task queue",task);
return false;
}else{
logger.info("check task {} exists in task queue ",task);
return true;
}
} catch (Exception e) {
logger.info(String.format("task {} check exists in task queue exception ", task), e);
}
return false;
}
/**
* add task to tasks queue
*
* @param key task queue name
* @param value ${priority}_${processInstanceId}_${taskId}
*/
@Override
public void add(String key, String value) {
try {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
}
/**
* An element pops out of the queue <p>
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* @param key task queue name
* @return the task id to be executed
*/
@Override
public String poll(String key) {
try{
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
List<String> list = zk.getChildren().forPath(getTasksPath(key));
if(list != null && list.size() > 0){
int size = list.size();
String formatTargetTask = null;
String targetTaskKey = null;
for (int i = 0; i < size; i++) {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
if(taskDetailArrs.length == 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(i > 0){
int result = formatTask.compareTo(formatTargetTask);
if(result < 0){
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
}
}else{
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
}
}else{
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
}
}
if(formatTargetTask != null){
String taskIdPath = tasksQueuePath + targetTaskKey;
logger.info("consume task {}", taskIdPath);
String[] vals = targetTaskKey.split(Constants.UNDERLINE);
try{
zk.delete().forPath(taskIdPath);
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_remove" + Constants.SINGLE_SLASH + targetTaskKey;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(targetTaskKey));
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, task detail: %s exception" ,targetTaskKey, vals[vals.length - 1]) ,e);
}
logger.info("consume task: {},there still have {} tasks need to be executed", targetTaskKey, size - 1);
return vals[vals.length - 1];
}else{
logger.error("should not go here, task queue poll error, please check!");
}
}
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
return null;
}
/**
* In order to be compatible with redis implementation
*
* To be compatible with the redis implementation, add an element to the set
* @param key The key is the kill/cancel queue path name
* @param value host-taskId The name of the zookeeper node
*/
@Override
public void sadd(String key,String value) {
try {
if(value != null && value.trim().length() > 0){
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
CuratorFramework zk = getZkClient();
Stat stat = zk.checkExists().forPath(path + value);
if(null == stat){
String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value));
logger.info("add task:{} to tasks set result:{} ",value,result);
}else{
logger.info("task {} exists in tasks set ",value);
}
}else{
logger.warn("add host-taskId:{} to tasks set is empty ",value);
}
} catch (Exception e) {
logger.error("add task to tasks set exception",e);
}
}
/**
* delete the value corresponding to the key in the set
* @param key The key is the kill/cancel queue path name
* @param value host-taskId-taskType The name of the zookeeper node
*/
@Override
public void srem(String key, String value) {
try{
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
CuratorFramework zk = getZkClient();
Stat stat = zk.checkExists().forPath(path + value);
if(null != stat){
zk.delete().forPath(path + value);
logger.info("delete task:{} from tasks set ",value);
}else{
logger.info("delete task:{} from tasks set fail, there is no this task",value);
}
}catch(Exception e){
logger.error(String.format("delete task:" + value + " exception"),e);
}
}
/**
* Gets all the elements of the set based on the key
* @param key The key is the kill/cancel queue path name
* @return
*/
@Override
public Set<String> smembers(String key) {
Set<String> tasksSet = new HashSet<>();
try {
List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));
for (String task : list) {
tasksSet.add(task);
}
return tasksSet;
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return tasksSet;
}
/**
* Init the task queue of zookeeper node
*/
private void init(){
try {
String tasksQueuePath = getTasksPath(Constants.SCHEDULER_TASKS_QUEUE);
String tasksCancelPath = getTasksPath(Constants.SCHEDULER_TASKS_KILL);
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
if(zkClient.checkExists().forPath(taskQueuePath) == null){
// create a persistent parent node
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(taskQueuePath);
logger.info("create tasks queue parent node success : {} ",taskQueuePath);
}
}
} catch (Exception e) {
logger.error("create zk node failure",e);
}
}
/**
* Clear the task queue of zookeeper node
*/
@Override
public void delete(){
try {
String tasksQueuePath = getTasksPath(Constants.SCHEDULER_TASKS_QUEUE);
String tasksCancelPath = getTasksPath(Constants.SCHEDULER_TASKS_KILL);
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
if(zkClient.checkExists().forPath(taskQueuePath) != null){
List<String> list = zkClient.getChildren().forPath(taskQueuePath);
for (String task : list) {
zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task);
logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);
}
}
}
} catch (Exception e) {
logger.error("delete all tasks in tasks queue failure",e);
}
}
/**
* get zookeeper client of CuratorFramework
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
* Get the task queue path
* @param key task queue name
* @return
*/
public String getTasksPath(String key){
return conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + key;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A base class for running a Unix command.
*
* <code>AbstractShell</code> can be used to run unix commands like <code>du</code> or
* <code>df</code>. It also offers facilities to gate commands by
* time-intervals.
*/
public abstract class AbstractShell {
private static final Logger logger = LoggerFactory.getLogger(AbstractShell.class);
/**
* Time after which the executing script would be timedout
*/
protected long timeOutInterval = 0L;
/**
* If or not script timed out
*/
private AtomicBoolean timedOut;
/**
* refresh interval in msec
*/
private long interval;
/**
* last time the command was performed
*/
private long lastTime;
/**
* env for the command execution
*/
private Map<String, String> environment;
private File dir;
/**
* sub process used to execute the command
*/
private Process process;
private int exitCode;
/**
* If or not script finished executing
*/
private volatile AtomicBoolean completed;
public AbstractShell() {
this(0L);
}
/**
* @param interval the minimum duration to wait before re-executing the
* command.
*/
public AbstractShell(long interval ) {
this.interval = interval;
this.lastTime = (interval<0) ? 0 : -interval;
}
/**
* set the environment for the command
* @param env Mapping of environment variables
*/
protected void setEnvironment(Map<String, String> env) {
this.environment = env;
}
/**
* set the working directory
* @param dir The directory where the command would be executed
*/
protected void setWorkingDirectory(File dir) {
this.dir = dir;
}
/**
* check to see if a command needs to be executed and execute if needed
*/
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis()) {
return;
}
// reset for next run
exitCode = 0;
runCommand();
}
/**
* Run a command actual work
*/
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null;
timedOut = new AtomicBoolean(false);
completed = new AtomicBoolean(false);
if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}
process = builder.start();
ProcessContainer.putProcess(process);
if (timeOutInterval > 0) {
timeOutTimer = new Timer();
timeoutTimerTask = new ShellTimeoutTimerTask(
this);
//One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
BufferedReader inReader =
new BufferedReader(new InputStreamReader(process
.getInputStream()));
final StringBuffer errMsg = new StringBuffer();
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch(IOException ioe) {
logger.warn("Error reading the error stream", ioe);
}
}
};
try {
errThread.start();
} catch (IllegalStateException ise) { }
try {
// parse the output
parseExecResult(inReader);
exitCode = process.waitFor();
try {
// make sure that the error thread exits
errThread.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted while reading the error stream", ie);
}
completed.set(true);
//the timeout thread handling
//taken care in finally block
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
if ((timeOutTimer!=null) && !timedOut.get()) {
timeOutTimer.cancel();
}
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
logger.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
}
try {
errReader.close();
} catch (IOException ioe) {
logger.warn("Error while closing the error stream", ioe);
}
ProcessContainer.removeProcess(process);
process.destroy();
lastTime = System.currentTimeMillis();
}
}
/**
* return an array containing the command name & its parameters
* */
protected abstract String[] getExecString();
/**
* Parse the execution result
* */
protected abstract void parseExecResult(BufferedReader lines)
throws IOException;
/**
* get the current sub-process executing the given command
* @return process executing the command
*/
public Process getProcess() {
return process;
}
/** get the exit code
* @return the exit code of the process
*/
public int getExitCode() {
return exitCode;
}
/**
* Set if the command has timed out.
*
*/
private void setTimedOut() {
this.timedOut.set(true);
}
/**
* Timer which is used to timeout scripts spawned off by shell.
*/
private static class ShellTimeoutTimerTask extends TimerTask {
private AbstractShell shell;
public ShellTimeoutTimerTask(AbstractShell shell) {
this.shell = shell;
}
@Override
public void run() {
Process p = shell.getProcess();
try {
p.exitValue();
} catch (Exception e) {
//Process has not terminated.
//So check if it has completed
//if not just destroy it.
if (p != null && !shell.completed.get()) {
shell.setTimedOut();
p.destroy();
}
}
}
}
/**
* This is an IOException with exit code added.
*/
public static class ExitCodeException extends IOException {
int exitCode;
public ExitCodeException(int exitCode, String message) {
super(message);
this.exitCode = exitCode;
}
public int getExitCode() {
return exitCode;
}
}
/**
* process manage container
*
*/
public static class ProcessContainer extends ConcurrentHashMap<Integer, Process>{
private static final ProcessContainer container = new ProcessContainer();
private ProcessContainer(){
super();
}
public static final ProcessContainer getInstance(){
return container;
}
public static void putProcess(Process process){
getInstance().put(process.hashCode(), process);
}
public static int processSize(){
return getInstance().size();
}
public static void removeProcess(Process process){
getInstance().remove(process.hashCode());
}
public static void destroyAllProcess(){
Set<Entry<Integer, Process>> set = getInstance().entrySet();
for (Entry<Integer, Process> entry : set) {
try{
entry.getValue().destroy();
} catch (Exception e) {
e.printStackTrace();
}
}
logger.info("close " + set.size() + " executing process tasks");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.shell;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
* shell command executor.
*
* <code>ShellExecutor</code> should be used in cases where the output
* of the command needs no explicit parsing and where the command, working
* directory and the environment remains unchanged. The output of the command
* is stored as-is and is expected to be small.
*/
public class ShellExecutor extends AbstractShell {
private String[] command;
private StringBuffer output;
public ShellExecutor(String... execString) {
this(execString, null);
}
public ShellExecutor(String[] execString, File dir) {
this(execString, dir, null);
}
public ShellExecutor(String[] execString, File dir,
Map<String, String> env) {
this(execString, dir, env , 0L);
}
/**
* Create a new instance of the ShellExecutor to execute a command.
*
* @param execString The command to execute with arguments
* @param dir If not-null, specifies the directory which should be set
* as the current working directory for the command.
* If null, the current working directory is not modified.
* @param env If not-null, environment of the command will include the
* key-value pairs specified in the map. If null, the current
* environment is not modified.
* @param timeout Specifies the time in milliseconds, after which the
* command will be killed and the status marked as timedout.
* If 0, the command will not be timed out.
*/
public ShellExecutor(String[] execString, File dir,
Map<String, String> env, long timeout) {
command = execString.clone();
if (dir != null) {
setWorkingDirectory(dir);
}
if (env != null) {
setEnvironment(env);
}
timeOutInterval = timeout;
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
* @param cmd shell command to execute.
* @return the output of the executed command.
*/
public static String execCommand(String... cmd) throws IOException {
return execCommand(null, cmd, 0L);
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @param timeout time in milliseconds after which script should be marked timeout
* @return the output of the executed command.o
*/
public static String execCommand(Map<String, String> env, String[] cmd,
long timeout) throws IOException {
ShellExecutor exec = new ShellExecutor(cmd, null, env,
timeout);
exec.execute();
return exec.getOutput();
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @return the output of the executed command.
*/
public static String execCommand(Map<String,String> env, String ... cmd)
throws IOException {
return execCommand(env, cmd, 0L);
}
/**
* Execute the shell command
*
*/
public void execute() throws IOException {
this.run();
}
@Override
protected String[] getExecString() {
return command;
}
@Override
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuffer();
char[] buf = new char[1024];
int nRead;
String line = "";
while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
line = new String(buf,0,nRead);
}
output.append(line);
}
/**
*
* Get the output of the shell command
*/
public String getOutput() {
return (output == null) ? "" : output.toString();
}
/**
* Returns the commands of this instance.
* Arguments with spaces in are presented with quotes round; other
* arguments are presented raw
*
* @return a string representation of the object
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
String[] args = getExecString();
for (String s : args) {
if (s.indexOf(' ') >= 0) {
builder.append('"').append(s).append('"');
} else {
builder.append(s);
}
builder.append(' ');
}
return builder.toString();
}
}
# ha or single namenode,If namenode ha needs to copy core-site.xml and hdfs-site.xml to the conf directory
fs.defaultFS=hdfs://mycluster:8020
#resourcemanager ha note this need ips , this empty if single
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
\ No newline at end of file
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册