提交 6a449f49 编写于 作者: P peng-yongsheng

Delete collector queue module.

上级 f40da63c
......@@ -36,5 +36,10 @@
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-datacarrier</artifactId>
<version>5.0.0-alpha</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -19,8 +19,7 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
/**
* @author peng-yongsheng
......@@ -29,12 +28,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT, OUTPUT, WORKER_TYP
public abstract int queueSize();
private final QueueCreatorService<INPUT> queueCreatorService;
public AbstractLocalAsyncWorkerProvider(ModuleManager moduleManager,
QueueCreatorService<INPUT> queueCreatorService) {
public AbstractLocalAsyncWorkerProvider(ModuleManager moduleManager) {
super(moduleManager);
this.queueCreatorService = queueCreatorService;
}
@Override
......@@ -43,8 +38,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT, OUTPUT, WORKER_TYP
workerCreateListener.addWorker(localAsyncWorker);
LocalAsyncWorkerRef<INPUT, OUTPUT> localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker);
QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorkerRef);
localAsyncWorkerRef.setQueueEventHandler(queueEventHandler);
DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, queueSize());
localAsyncWorkerRef.setQueueEventHandler(dataCarrier);
return localAsyncWorkerRef;
}
}
......@@ -18,32 +18,46 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.CollectorException;
import java.util.List;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerRef<INPUT, OUTPUT> extends WorkerRef<INPUT, OUTPUT> implements QueueExecutor<INPUT> {
public class LocalAsyncWorkerRef<INPUT, OUTPUT> extends WorkerRef<INPUT, OUTPUT> implements IConsumer<INPUT> {
private QueueEventHandler<INPUT> queueEventHandler;
private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
private DataCarrier<INPUT> dataCarrier;
LocalAsyncWorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
super(destinationHandler);
}
public void setQueueEventHandler(QueueEventHandler<INPUT> queueEventHandler) {
this.queueEventHandler = queueEventHandler;
public void setQueueEventHandler(DataCarrier<INPUT> dataCarrier) {
this.dataCarrier = dataCarrier;
}
@Override public void consume(List<INPUT> data) {
data.forEach(this::out);
}
@Override public void init() {
}
@Override public void onError(List<INPUT> data, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void execute(INPUT input) throws CollectorException {
out(input);
@Override public void onExit() {
}
@Override protected void in(INPUT input) {
queueEventHandler.tell(input);
dataCarrier.produce(input);
}
@Override protected void out(INPUT input) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message) {
super(message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
/**
* This exception is raised when worker fails to process job during "call" or "ask"
*
* @author peng-yongsheng
* @since v3.1-2017
*/
public class WorkerInvokeException extends WorkerException {
public WorkerInvokeException(String message) {
super(message);
}
public WorkerInvokeException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
public class WorkerNotFoundException extends WorkerException {
public WorkerNotFoundException(String message) {
super(message);
}
}
......@@ -17,7 +17,7 @@
*/
package org.apache.skywalking.apm.collector.queue.base;
package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-queue</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-queue-datacarrier-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-queue-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.datacarrier;
import java.util.Properties;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.queue.QueueModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.queue.datacarrier.service.DataCarrierQueueCreatorService;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModuleDataCarrierProvider extends ModuleProvider {
@Override public String name() {
return "datacarrier";
}
@Override public Class<? extends Module> module() {
return QueueModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(QueueCreatorService.class, new DataCarrierQueueCreatorService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.datacarrier.service;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class DataCarrierQueueCreatorService implements QueueCreatorService {
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
return null;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.queue.datacarrier.QueueModuleDataCarrierProvider
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-queue</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-queue-define</artifactId>
<packaging>jar</packaging>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModule extends Module {
public static final String NAME = "queue";
@Override public String name() {
return NAME;
}
@Override public Class[] services() {
return new Class[] {QueueCreatorService.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.base;
import java.util.concurrent.ThreadFactory;
/**
* @author peng-yongsheng
*/
public enum DaemonThreadFactory implements ThreadFactory {
INSTANCE;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public interface QueueCreator {
QueueEventHandler create(int queueSize, QueueExecutor executor);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public interface QueueEventHandler<MESSAGE> {
void tell(MESSAGE message);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.base;
import org.apache.skywalking.apm.collector.core.framework.Executor;
/**
* @author peng-yongsheng
*/
public interface QueueExecutor<MESSAGE> extends Executor<MESSAGE> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.service;
import org.apache.skywalking.apm.collector.core.module.Service;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
/**
* @author peng-yongsheng
*/
public interface QueueCreatorService<MESSAGE> extends Service {
QueueEventHandler<MESSAGE> create(int queueSize, QueueExecutor<MESSAGE> executor);
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.queue.QueueModule
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-queue</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-queue-disruptor-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-queue-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.disruptor;
import java.util.Properties;
import org.apache.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.queue.QueueModule;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModuleDisruptorProvider extends ModuleProvider {
@Override public String name() {
return "disruptor";
}
@Override public Class<? extends Module> module() {
return QueueModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(QueueCreatorService.class, new DisruptorQueueCreatorService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import org.apache.skywalking.apm.collector.core.CollectorException;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
import org.apache.skywalking.apm.collector.queue.base.MessageHolder;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DisruptorEventHandler<MESSAGE extends EndOfBatchQueueMessage> implements EventHandler<MessageHolder<MESSAGE>>, QueueEventHandler<MESSAGE> {
private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class);
private RingBuffer<MessageHolder<MESSAGE>> ringBuffer;
private QueueExecutor<MESSAGE> executor;
DisruptorEventHandler(RingBuffer<MessageHolder<MESSAGE>> ringBuffer, QueueExecutor<MESSAGE> executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
/**
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder<MESSAGE> event, long sequence, boolean endOfBatch) throws CollectorException {
MESSAGE message = event.getMessage();
event.reset();
message.setEndOfBatch(endOfBatch);
executor.execute(message);
}
/**
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
*/
public void tell(MESSAGE message) {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.skywalking.apm.collector.queue.base.DaemonThreadFactory;
import org.apache.skywalking.apm.collector.queue.base.MessageHolder;
import org.apache.skywalking.apm.collector.queue.base.QueueCreator;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreator implements QueueCreator {
private final Logger logger = LoggerFactory.getLogger(DisruptorQueueCreator.class);
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
// Specify the size of the ring buffer, must be power of 2.
if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
throw new IllegalArgumentException("queue size must be power of 2");
}
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor<>(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);
disruptor.setDefaultExceptionHandler(new ExceptionHandler<MessageHolder>() {
@Override public void handleEventException(Throwable ex, long sequence, MessageHolder event) {
logger.error("Handle disruptor error event! message: {}.", event.getMessage(), ex);
}
@Override public void handleOnStartException(Throwable ex) {
logger.error("create disruptor queue failed!", ex);
}
@Override public void handleOnShutdownException(Throwable ex) {
logger.error("shutdown disruptor queue failed!", ex);
}
});
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);
// Connect the handler
disruptor.handleEventsWith(eventHandler);
// Start the Disruptor, starts all threads running
disruptor.start();
return eventHandler;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.EventFactory;
import org.apache.skywalking.apm.collector.queue.base.MessageHolder;
/**
* @author peng-yongsheng
*/
public class MessageHolderFactory implements EventFactory<MessageHolder> {
public static MessageHolderFactory INSTANCE = new MessageHolderFactory();
public MessageHolder newInstance() {
return new MessageHolder();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.queue.disruptor.service;
import org.apache.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.apache.skywalking.apm.collector.queue.base.QueueExecutor;
import org.apache.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator;
import org.apache.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreatorService implements QueueCreatorService {
private final DisruptorQueueCreator creator;
public DisruptorQueueCreatorService() {
this.creator = new DisruptorQueueCreator();
}
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
return creator.create(queueSize, executor);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.queue.disruptor.QueueModuleDisruptorProvider
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-queue</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-queue-define</module>
<module>collector-queue-datacarrier-provider</module>
<module>collector-queue-disruptor-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -42,7 +42,6 @@
<module>apm-collector-grpc-manager</module>
<module>apm-collector-jetty-manager</module>
<module>apm-collector-remote</module>
<module>apm-collector-queue</module>
<module>apm-collector-instrument</module>
<module>apm-collector-agent-stream</module>
<module>apm-collector-configuration</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册