提交 d68c0883 编写于 作者: Y Yanick.xia 提交者: wu-sheng

Fix more resonable error (#3619)

* Fix #3402

* Fix unecessary blankline

* Optimize rebuild logic

* Fix checkstyle fail

* Add concurrency test

* Fix text mistakes

* Optimize refresh logic && new test for return clients never change by refresh logic

* Make unit test more clearness

* Add more unit test && fix a bug

* Rename variable && format by skywalking style
上级 61705958
......@@ -52,27 +52,24 @@ public class RemoteSenderService implements Service {
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
RemoteClient remoteClient = null;
List<RemoteClient> clientList = clientManager.getRemoteClient();
if (clientList.size() == 0) {
logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
return;
}
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case Rolling:
remoteClient = rollingSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case ForeverFirst:
remoteClient = foreverFirstSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
}
remoteClient.push(nextWorkName, streamData);
}
}
......@@ -18,9 +18,10 @@
package org.apache.skywalking.oap.server.core.remote.client;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
......@@ -29,6 +30,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
......@@ -53,27 +58,24 @@ public class RemoteClientManager implements Service {
private final ModuleDefineHolder moduleDefineHolder;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients;
private GaugeMetrics gauge;
private int remoteTimeout;
/**
* Initial the manager for all remote communication clients.
*
* @param moduleDefineHolder for looking up other modules
* @param remoteTimeout for cluster internal communication, in second unit.
* @param remoteTimeout for cluster internal communication, in second unit.
*/
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
this.usingClients = ImmutableList.of();
this.remoteTimeout = remoteTimeout;
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 5, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}
/**
......@@ -128,7 +130,7 @@ public class RemoteClientManager implements Service {
private void printRemoteClientList() {
if (logger.isDebugEnabled()) {
StringBuilder addresses = new StringBuilder();
getRemoteClient().forEach(client -> addresses.append(client.getAddress().toString()).append(","));
this.usingClients.forEach(client -> addresses.append(client.getAddress().toString()).append(","));
logger.debug("Remote client list: {}", addresses);
}
}
......@@ -157,77 +159,58 @@ public class RemoteClientManager implements Service {
return usingClients;
}
private List<RemoteClient> getFreeClients() {
if (usingClients.equals(clientsA)) {
return clientsB;
} else {
return clientsA;
}
}
private void switchCurrentClients() {
if (usingClients.equals(clientsA)) {
usingClients = clientsB;
} else {
usingClients = clientsA;
}
}
/**
* Compare clients between exist clients and remote instance collection. Move the clients into new client collection
* which are alive to avoid create a new channel. Shutdown the clients which could not find in cluster config.
*
* <p>
* Create a gRPC client for remote instance except for self-instance.
*
* @param remoteInstances Remote instance collection by query cluster config.
*/
private synchronized void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
getFreeClients().clear();
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
.collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));
Map<Address, RemoteClient> remoteClients = new HashMap<>();
getRemoteClient().forEach(client -> remoteClients.put(client.getAddress(), client));
final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
.collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));
Map<Address, Action> tempRemoteClients = new HashMap<>();
getRemoteClient().forEach(client -> tempRemoteClients.put(client.getAddress(), Action.Close));
final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());
remoteInstances.forEach(remoteInstance -> {
if (tempRemoteClients.containsKey(remoteInstance.getAddress())) {
tempRemoteClients.put(remoteInstance.getAddress(), Action.Leave);
} else {
tempRemoteClients.put(remoteInstance.getAddress(), Action.Create);
}
});
unChangeAddresses.stream()
.filter(remoteClientCollection::containsKey)
.forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged));
tempRemoteClients.forEach((address, action) -> {
switch (action) {
case Leave:
if (remoteClients.containsKey(address)) {
getFreeClients().add(remoteClients.get(address));
}
// make the latestRemoteClients including the new clients only
unChangeAddresses.forEach(latestRemoteClients::remove);
remoteClientCollection.putAll(latestRemoteClients);
final List<RemoteClient> newRemoteClients = new LinkedList<>();
remoteClientCollection.forEach((address, clientAction) -> {
switch (clientAction.getAction()) {
case Unchanged:
newRemoteClients.add(clientAction.getRemoteClient());
break;
case Create:
if (address.isSelf()) {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
newRemoteClients.add(client);
} else {
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
client.connect();
getFreeClients().add(client);
newRemoteClients.add(client);
}
break;
}
});
Collections.sort(getFreeClients());
switchCurrentClients();
tempRemoteClients.forEach((address, action) -> {
if (Action.Close.equals(action) && remoteClients.containsKey(address)) {
remoteClients.get(address).close();
}
});
//for stable ordering for rolling selector
Collections.sort(newRemoteClients);
this.usingClients = ImmutableList.copyOf(newRemoteClients);
getFreeClients().clear();
remoteClientCollection.values()
.stream()
.filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
}
private boolean compare(List<RemoteInstance> remoteInstances) {
......@@ -244,6 +227,15 @@ public class RemoteClientManager implements Service {
}
enum Action {
Close, Leave, Create
Close, Unchanged, Create
}
@Getter
@AllArgsConstructor
static private class RemoteClientAction {
private RemoteClient remoteClient;
@Setter
private Action action;
}
}
......@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
......@@ -30,19 +29,31 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.internal.verification.AtLeast;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
*/
@Slf4j
public class RemoteClientManagerTestCase {
@Test
public void refresh() {
private RemoteClientManager clientManager;
private ClusterNodesQuery clusterNodesQuery;
@Before
public void setup() {
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting clusterModuleDefine = new ModuleDefineTesting();
moduleManager.put(ClusterModule.NAME, clusterModuleDefine);
......@@ -50,7 +61,7 @@ public class RemoteClientManagerTestCase {
ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, coreModuleDefine);
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
this.clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
......@@ -80,8 +91,11 @@ public class RemoteClientManagerTestCase {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10);
this.clientManager = spy(new RemoteClientManager(moduleManager, 10));
}
@Test
public void refresh() {
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();
......@@ -125,4 +139,99 @@ public class RemoteClientManagerTestCase {
instances.add(new RemoteInstance(new Address("host4", 100, false)));
return instances;
}
@Test
public void testConcurrenceGetRemoteClientAndRefresh() throws Exception {
this.refresh(); //guarantee has any client in clientManager
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
log.debug("begin concurrency test");
});
final ExecutorService executorService = Executors.newFixedThreadPool(3);
final Future<?> refreshFuture = executorService.submit(() -> {
try {
cyclicBarrier.await();
this.refresh();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
executorService.submit(() -> {
try {
int i = 0;
cyclicBarrier.await();
while (!refreshFuture.isDone()) {
Assert.assertFalse(this.clientManager.getRemoteClient().isEmpty());
log.debug("thread {} invoke {} times", Thread.currentThread().getName(), i++);
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
try {
int i = 0;
cyclicBarrier.await();
while (!refreshFuture.isDone()) {
Assert.assertFalse(this.clientManager.getRemoteClient().isEmpty());
log.debug("thread {} invoke {} times", Thread.currentThread().getName(), i++);
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
verify(this.clientManager, new AtLeast(2)).getRemoteClient();
}
@Test
public void testGetRemoteClientAndNeverChange() {
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
this.clientManager.refresh();
final List<RemoteClient> gotGroupOneInstances = this.clientManager.getRemoteClient();
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupTwoInstances());
this.clientManager.refresh();
final List<RemoteClient> gotGroupTwoInstances = this.clientManager.getRemoteClient();
Assert.assertEquals(gotGroupOneInstances.size(), groupOneInstances().size());
Assert.assertEquals(gotGroupTwoInstances.size(), groupTwoInstances().size());
Assert.assertNotEquals(gotGroupOneInstances.size(), gotGroupTwoInstances.size());
}
@Test
public void testCompare() {
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();
List<RemoteClient> groupOneRemoteClients = clientManager.getRemoteClient();
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();
List<RemoteClient> newGroupOneRemoteClients = clientManager.getRemoteClient();
Assert.assertArrayEquals(groupOneRemoteClients.toArray(), newGroupOneRemoteClients.toArray());
}
@Test
public void testUnChangeRefresh() {
final List<RemoteInstance> groupOneInstances = groupOneInstances();
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances);
clientManager.refresh();
List<RemoteClient> groupOneRemoteClients = clientManager.getRemoteClient();
groupOneInstances.add(new RemoteInstance(new Address("host4", 100, false)));
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances);
clientManager.refresh();
List<RemoteClient> newGroupOneRemoteClients = clientManager.getRemoteClient();
Assert.assertEquals(groupOneRemoteClients.get(0).getAddress(), newGroupOneRemoteClients.get(0).getAddress());
Assert.assertEquals(newGroupOneRemoteClients.get(3).getAddress().getHost(), "host4");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册