提交 f9bee8c4 编写于 作者: wu-sheng's avatar wu-sheng

Refactor codes to a new style. All the refactors are based on the new WorkerSelector.

上级 aca66aec
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.actor.router.WorkerRouter;
import com.a.eye.skywalking.collector.actor.router.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import java.util.List;
/**
* @author pengys5
*/
public abstract class AbstractWorker extends UntypedActor {
public abstract class AbstractWorker<T> extends UntypedActor {
final String workerRole;
......@@ -38,8 +41,9 @@ public abstract class AbstractWorker extends UntypedActor {
}
}
public void tell(String workerRole, WorkerRouter router, Object message) throws Throwable {
router.find(workerRole).tell(message, getSelf());
public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Throwable {
List<ActorRef> avaibleWorks = WorkersRefCenter.INSTANCE.avaibleWorks(targetWorkerProvider.roleName());
selector.select(targetWorkerProvider.roleName(), avaibleWorks, message).tell(message, getSelf());
}
void register(Member member) {
......
......@@ -9,16 +9,11 @@ import com.a.eye.skywalking.api.util.StringUtil;
*/
public abstract class AbstractWorkerProvider {
public abstract String workerRole();
public abstract Class workerClass();
public abstract int workerNum();
public void createWorker(ActorSystem system) {
if (StringUtil.isEmpty(workerRole())) {
throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerRole()");
}
if (workerClass() == null) {
throw new IllegalArgumentException("cannot createWorker() with nothing obtained from workerClass()");
}
......@@ -27,7 +22,11 @@ public abstract class AbstractWorkerProvider {
}
for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass(), workerRole()), workerRole() + "_" + i);
system.actorOf(Props.create(workerClass(), roleName()), roleName() + "_" + i);
}
}
protected String roleName(){
return workerClass().getSimpleName();
}
}
package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import java.util.List;
import java.util.Random;
/**
* @author pengys5
*/
public class RandomRouter implements WorkerRouter {
@Override
public ActorRef find(String workerRole) {
int workerNum = WorkersRefCenter.INSTANCE.sizeOf(workerRole);
Random random = new Random(workerNum);
return WorkersRefCenter.INSTANCE.find(workerRole, random.nextInt());
}
}
package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import java.util.List;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link ActorRef} nearly random, by round-robin.
*
* @author wusheng
*/
public enum RollingSelector implements WorkerSelector<Object> {
INSTANCE;
/**
* A simple round variable.
*/
private int index = 0;
/**
* Use round-robin to select {@link ActorRef}.
*
* @param members given {@link ActorRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link ActorRef}
*/
@Override
public ActorRef select(List<ActorRef> members, Object message) {
int size = members.size();
index++;
int selectIndex = Math.abs(index) % size;
return members.get(selectIndex);
}
}
package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef;
import java.util.List;
/**
* @author wusheng
*/
public interface WorkerRouter {
ActorRef find(String workerRole);
}
package com.a.eye.skywalking.collector.actor.router;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import java.util.List;
/**
* The <code>WorkerSelector</code> should be implemented
* by any class whose instances are intended to provide select a {@link ActorRef} from a {@link ActorRef} list.
* <p></p>
* Actually, the <code>ActorRef</code> is designed to provide a routing ability in the collector cluster.
*
* @author wusheng
*/
public interface WorkerSelector<T> {
/**
* select a {@link ActorRef} from a {@link ActorRef} list.
*
* @param members given {@link ActorRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link ActorRef}
*/
ActorRef select(List<ActorRef> members, T message);
}
package com.a.eye.skywalking.collector.cluster;
/**
* The <code>NoAvailableWorkerException</code> represents no available member,
* when a {@link WorkerSelector} try to select.
*
* Most likely, in the cluster, these is no active worker of the particular role.
*
* @author wusheng
*/
public class NoAvailableWorkerException extends Exception {
public NoAvailableWorkerException(String message){
super(message);
}
}
......@@ -37,11 +37,19 @@ public enum WorkersRefCenter {
actorToRole.remove(newRef);
}
public ActorRef find(String workerRole, int sequence) {
return roleToActor.get(workerRole).get(sequence);
}
public int sizeOf(String workerRole) {
return roleToActor.get(workerRole).size();
/**
* Get a copy all available {@link ActorRef} list, by the given worker role.
* @param workerRole the given role
* @return available {@link ActorRef} list
* @throws NoAvailableWorkerException , when no available worker.
*/
public List<ActorRef> avaibleWorks(String workerRole) throws NoAvailableWorkerException {
List<ActorRef> refs = roleToActor.get(workerRole);
if(refs == null || refs.size() == 0){
throw new NoAvailableWorkerException("role=" + workerRole + ", no available worker.");
}
List<ActorRef> availableList = new ArrayList<>(refs.size());
availableList.addAll(refs);
return Collections.unmodifiableList(availableList);
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册