提交 63eef704 编写于 作者: P peng-yongsheng

add queue component

上级 4a77986b
......@@ -14,8 +14,8 @@
<modules>
<module>client-component</module>
<module>server-component</module>
<module>queue-component</module>
<module>stream-component</module>
<module>queue-component</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
......@@ -12,4 +30,11 @@
<artifactId>queue-component</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
import java.util.concurrent.ThreadFactory;
/**
* @author peng-yongsheng
*/
public enum DaemonThreadFactory implements ThreadFactory {
INSTANCE;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
/**
* @author peng-yongsheng
*/
public class EndOfBatchCommand {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
/**
* @author peng-yongsheng
*/
public class MessageHolder {
private Object message;
public Object getMessage() {
return message;
}
public void setMessage(Object message) {
this.message = message;
}
public void reset() {
message = null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
/**
* @author peng-yongsheng
*/
public interface QueueCreator {
QueueEventHandler create(int queueSize, QueueExecutor executor);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
/**
* @author peng-yongsheng
*/
public interface QueueEventHandler {
void tell(Object message);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
/**
* @author peng-yongsheng
*/
public interface QueueExecutor {
void execute(Object message);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import org.skywalking.apm.collector.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.queue.MessageHolder;
import org.skywalking.apm.collector.queue.QueueEventHandler;
import org.skywalking.apm.collector.queue.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DisruptorEventHandler implements EventHandler<MessageHolder>, QueueEventHandler {
private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class);
private RingBuffer<MessageHolder> ringBuffer;
private QueueExecutor executor;
DisruptorEventHandler(RingBuffer<MessageHolder> ringBuffer, QueueExecutor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
/**
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
Object message = event.getMessage();
event.reset();
executor.execute(message);
if (endOfBatch) {
executor.execute(new EndOfBatchCommand());
}
}
/**
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
*/
public void tell(Object message) {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.skywalking.apm.collector.queue.DaemonThreadFactory;
import org.skywalking.apm.collector.queue.MessageHolder;
import org.skywalking.apm.collector.queue.QueueCreator;
import org.skywalking.apm.collector.queue.QueueEventHandler;
import org.skywalking.apm.collector.queue.QueueExecutor;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreator implements QueueCreator {
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
// Specify the size of the ring buffer, must be power of 2.
if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
throw new IllegalArgumentException("queue size must be power of 2");
}
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);
// Connect the handler
disruptor.handleEventsWith(eventHandler);
// Start the Disruptor, starts all threads running
disruptor.start();
return eventHandler;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor;
import com.lmax.disruptor.EventFactory;
import org.skywalking.apm.collector.queue.MessageHolder;
/**
* @author peng-yongsheng
*/
public class MessageHolderFactory implements EventFactory<MessageHolder> {
public static MessageHolderFactory INSTANCE = new MessageHolderFactory();
public MessageHolder newInstance() {
return new MessageHolder();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
* which receive local asynchronous message.
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractWorker<LocalAsyncWorkerRef> implements QueueExecutor {
private LocalAsyncWorkerRef workerRef;
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
* The asynchronous worker always use to persistence data into db, this is the end of the streaming,
* so usually no need to create the next worker instance at the time of this worker instance create.
*
* @throws ProviderNotFoundException When worker provider not found, it will be throw this exception.
*/
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override protected final LocalAsyncWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(LocalAsyncWorkerRef workerRef) {
this.workerRef = workerRef;
}
/**
* Receive message
*
* @param message The persistence data or metric data.
* @throws WorkerException The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.queue.QueueCreator;
import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
/**
* @author peng-yongsheng
*/
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker & QueueExecutor> extends AbstractWorkerProvider<T> {
public abstract int queueSize();
@Override final public WorkerRef create() throws ProviderNotFoundException {
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
if (localAsyncWorker instanceof PersistenceWorker) {
PersistenceWorkerContainer.INSTANCE.addWorker((PersistenceWorker)localAsyncWorker);
}
QueueCreator queueCreator = ((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(QueueModuleGroupDefine.GROUP_NAME)).getQueueCreator();
QueueEventHandler queueEventHandler = queueCreator.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
getClusterContext().put(workerRef);
localAsyncWorker.putSelfRef(workerRef);
return workerRef;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* The <code>AbstractRemoteWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorker extends AbstractWorker<RemoteWorkerRef> {
private RemoteWorkerRef workerRef;
/**
* Construct an <code>AbstractRemoteWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
*/
protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws WorkerInvokeException {
try {
onWork(message);
} catch (WorkerException e) {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
@Override protected final RemoteWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(RemoteWorkerRef workerRef) {
this.workerRef = workerRef;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
/**
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
* which create instance of cluster workers whose implemented {@link AbstractRemoteWorker}.
* <p>
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorker> extends AbstractWorkerProvider<T> {
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @return The created worker reference. See {@link RemoteWorkerRef}
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create() {
T clusterWorker = workerInstance(getClusterContext());
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), clusterWorker);
getClusterContext().put(workerRef);
return workerRef;
}
public final RemoteWorkerRef create(GRPCClient client) {
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), client);
getClusterContext().put(workerRef);
return workerRef;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<S extends WorkerRef> implements Executor {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final Role role;
private final ClusterWorkerContext clusterContext;
public AbstractWorker(Role role, ClusterWorkerContext clusterContext) {
this.role = role;
this.clusterContext = clusterContext;
}
@Override public final void execute(Object message) {
try {
onWork(message);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
}
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws WorkerException Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
public abstract void preStart() throws ProviderNotFoundException;
final public ClusterWorkerContext getClusterContext() {
return clusterContext;
}
final public Role getRole() {
return role;
}
protected abstract S getSelf();
protected abstract void putSelfRef(S workerRef);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorkerProvider<T extends AbstractWorker> implements Provider {
private ClusterWorkerContext clusterContext;
public abstract Role role();
public abstract T workerInstance(ClusterWorkerContext clusterContext);
final public void setClusterContext(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
final protected ClusterWorkerContext getClusterContext() {
return clusterContext;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
*/
public class ClusterWorkerContext extends WorkerContext {
private List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
public List<AbstractRemoteWorkerProvider> getProviders() {
return providers;
}
@Override
public void putProvider(AbstractRemoteWorkerProvider provider) {
providers.add(provider);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
public enum ClusterWorkerRefCounter {
INSTANCE;
private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
public int incrementAndGet(Role role) {
if (!counter.containsKey(role.roleName())) {
AtomicInteger atomic = new AtomicInteger(0);
counter.putIfAbsent(role.roleName(), atomic);
}
return counter.get(role.roleName()).incrementAndGet();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* @author peng-yongsheng
*/
public interface Context extends LookUp {
void putProvider(AbstractRemoteWorkerProvider provider);
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException;
void put(WorkerRef workerRef);
void remove(WorkerRef workerRef);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerProviderDefineLoader implements Loader<List<AbstractLocalAsyncWorkerProvider>> {
private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerProviderDefineLoader.class);
@Override public List<AbstractLocalAsyncWorkerProvider> load() throws DefineException {
List<AbstractLocalAsyncWorkerProvider> providers = new ArrayList<>();
LocalWorkerProviderDefinitionFile definitionFile = new LocalWorkerProviderDefinitionFile();
logger.info("local async worker provider definition file name: {}", definitionFile.fileName());
DefinitionLoader<AbstractLocalAsyncWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile);
for (AbstractLocalAsyncWorkerProvider provider : definitionLoader) {
logger.info("loaded local async worker provider definition class: {}", provider.getClass().getName());
providers.add(provider);
}
return providers;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.queue.QueueEventHandler;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerRef extends WorkerRef {
private QueueEventHandler queueEventHandler;
public LocalAsyncWorkerRef(Role role, QueueEventHandler queueEventHandler) {
super(role);
this.queueEventHandler = queueEventHandler;
}
@Override
public void tell(Object message) throws WorkerInvokeException {
queueEventHandler.tell(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class LocalWorkerProviderDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "local_worker_provider.define";
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* @author peng-yongsheng
*/
public interface LookUp {
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* @author peng-yongsheng
*/
public interface Provider {
WorkerRef create() throws ProviderNotFoundException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
public class ProviderNotFoundException extends Exception {
public ProviderNotFoundException(String message) {
super(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class RemoteWorkerProviderDefineLoader implements Loader<List<AbstractRemoteWorkerProvider>> {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerProviderDefineLoader.class);
@Override public List<AbstractRemoteWorkerProvider> load() throws DefineException {
List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile();
logger.info("remote worker provider definition file name: {}", definitionFile.fileName());
DefinitionLoader<AbstractRemoteWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractRemoteWorkerProvider.class, definitionFile);
for (AbstractRemoteWorkerProvider provider : definitionLoader) {
logger.info("loaded remote worker provider definition class: {}", provider.getClass().getName());
providers.add(provider);
}
return providers;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author peng-yongsheng
*/
public class RemoteWorkerProviderDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "remote_worker_provider.define";
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class RemoteWorkerRef extends WorkerRef {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final Boolean acrossJVM;
private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub;
private StreamObserver<RemoteMessage> streamObserver;
private final AbstractRemoteWorker remoteWorker;
private final String address;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.stub = null;
this.address = Const.EMPTY_STRING;
}
public RemoteWorkerRef(Role role, GRPCClient client) {
super(role);
this.remoteWorker = null;
this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
this.address = client.toString();
createStreamObserver();
}
@Override
public void tell(Object message) throws WorkerInvokeException {
if (acrossJVM) {
try {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
streamObserver.onNext(builder.build());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
} else {
remoteWorker.allocateJob(message);
}
}
public Boolean isAcrossJVM() {
return acrossJVM;
}
private void createStreamObserver() {
StreamStatus status = new StreamStatus(false);
streamObserver = stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.finished();
}
});
}
class StreamStatus {
private volatile boolean status;
public StreamStatus(boolean status) {
this.status = status;
}
public boolean isFinish() {
return status;
}
public void finished() {
this.status = true;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public void wait4Finish(long maxTimeout) {
long time = 0;
while (!status) {
if (time > maxTimeout) {
break;
}
try2Sleep(5);
time += 5;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
@Override public String toString() {
StringBuilder toString = new StringBuilder();
toString.append("acrossJVM: ").append(acrossJVM);
toString.append(", address: ").append(address);
return toString.toString();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author peng-yongsheng
*/
public interface Role {
String roleName();
WorkerSelector workerSelector();
DataDefine dataDefine();
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message) {
super(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class WorkerContext implements Context {
private final Logger logger = LoggerFactory.getLogger(WorkerContext.class);
private Map<String, RemoteWorkerRef> remoteWorkerRefs;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
WorkerContext() {
this.roleWorkers = new HashMap<>();
this.roles = new HashMap<>();
this.remoteWorkerRefs = new HashMap<>();
}
private Map<String, List<WorkerRef>> getRoleWorkers() {
return this.roleWorkers;
}
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
return new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
} else {
throw new WorkerNotFoundException("role=" + role.roleName() + ", no available worker.");
}
}
@Override final public RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException {
if (remoteWorkerRefs.containsKey(roleName)) {
return remoteWorkerRefs.get(roleName);
} else {
throw new WorkerNotFoundException("role=" + roleName + ", no available worker.");
}
}
public final void putRole(Role role) {
roles.put(role.roleName(), role);
}
public final Role getRole(String roleName) {
return roles.get(roleName);
}
@Override final public void put(WorkerRef workerRef) {
logger.debug("put worker reference into context, role name: {}", workerRef.getRole().roleName());
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<>());
}
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
if (workerRef instanceof RemoteWorkerRef) {
RemoteWorkerRef remoteWorkerRef = (RemoteWorkerRef)workerRef;
if (!remoteWorkerRef.isAcrossJVM()) {
remoteWorkerRefs.put(workerRef.getRole().roleName(), remoteWorkerRef);
}
}
}
@Override final public void remove(WorkerRef workerRef) {
getRoleWorkers().remove(workerRef.getRole().roleName());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* Defines a general exception a worker can throw when it
* encounters difficulty.
*
* @author peng-yongsheng
* @since v3.1-2017
*/
public class WorkerException extends Exception {
public WorkerException(String message) {
super(message);
}
public WorkerException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* This exception is raised when worker fails to process job during "call" or "ask"
*
* @author peng-yongsheng
* @since v3.1-2017
*/
public class WorkerInvokeException extends WorkerException {
public WorkerInvokeException(String message) {
super(message);
}
public WorkerInvokeException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
public class WorkerNotFoundException extends WorkerException {
public WorkerNotFoundException(String message) {
super(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
/**
* @author peng-yongsheng
*/
public abstract class WorkerRef {
private Role role;
public WorkerRef(Role role) {
this.role = role;
}
final public Role getRole() {
return role;
}
public abstract void tell(Object message) throws WorkerInvokeException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class WorkerRefs<T extends WorkerRef> {
private final Logger logger = LoggerFactory.getLogger(WorkerRefs.class);
private List<T> workerRefs;
private WorkerSelector workerSelector;
private Role role;
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
}
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector, Role role) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
this.role = role;
}
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerRefs.forEach(workerRef -> {
if (workerRef instanceof RemoteWorkerRef) {
logger.debug("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString());
}
});
workerSelector.select(workerRefs, message).tell(message);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ForeverFirstSelector implements WorkerSelector<WorkerRef> {
private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public WorkerRef select(List<WorkerRef> members, Object message) {
logger.debug("member size: {}", members.size());
return members.get(0);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
import java.util.List;
import org.skywalking.apm.collector.core.stream.AbstractHashMessage;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
* by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode message to same {@link
* WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public class HashCodeSelector implements WorkerSelector<WorkerRef> {
/**
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
if (message instanceof AbstractHashMessage) {
AbstractHashMessage hashMessage = (AbstractHashMessage)message;
int size = members.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
} else {
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public class RollingSelector implements WorkerSelector<WorkerRef> {
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
index++;
int selectIndex = Math.abs(index) % size;
return members.get(selectIndex);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
/**
* The <code>WorkerSelector</code> should be implemented by any class whose instances
* are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list.
* <p>
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster
*
* @author peng-yongsheng
* @since v3.0-2017
*/
public interface WorkerSelector<T extends WorkerRef> {
/**
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
T select(List<T> members, Object message);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册