提交 3ed9d5e9 编写于 作者: T terrymanu

remove EtcdWatcher, because only can trigger single listener right now

上级 dfb14db3
......@@ -15,13 +15,11 @@ import etcdserverpb.Rpc.WatchRequest;
import etcdserverpb.WatchGrpc;
import etcdserverpb.WatchGrpc.WatchStub;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import io.shardingjdbc.orchestration.reg.base.CoordinatorRegistryCenter;
import io.shardingjdbc.orchestration.reg.base.EventListener;
import io.shardingjdbc.orchestration.reg.etcd.internal.channel.EtcdChannelFactory;
import io.shardingjdbc.orchestration.reg.etcd.internal.retry.EtcdRetryEngine;
import io.shardingjdbc.orchestration.reg.etcd.internal.watcher.EtcdWatchStreamObserver;
import io.shardingjdbc.orchestration.reg.etcd.internal.watcher.EtcdWatcher;
import io.shardingjdbc.orchestration.reg.exception.RegException;
import mvccpb.Kv.KeyValue;
......@@ -157,20 +155,15 @@ public final class EtcdRegistryCenter implements CoordinatorRegistryCenter {
public void watch(final String key, final EventListener eventListener) {
String fullPath = getFullPathWithNamespace(key);
WatchCreateRequest createWatchRequest = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(fullPath)).setRangeEnd(getRangeEnd(fullPath)).build();
final WatchRequest watchRequest = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build();
Optional<EtcdWatcher> watcher = etcdRetryEngine.execute(new Callable<EtcdWatcher>() {
final WatchRequest request = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build();
etcdRetryEngine.execute(new Callable<Void>() {
@Override
public EtcdWatcher call() throws Exception {
EtcdWatcher etcdWatcher = new EtcdWatcher();
StreamObserver<WatchRequest> requestStream = watchStub.watch(new EtcdWatchStreamObserver(etcdWatcher));
requestStream.onNext(watchRequest);
return etcdWatcher;
public Void call() throws Exception {
watchStub.watch(new EtcdWatchStreamObserver(eventListener)).onNext(request);
return null;
}
});
if (watcher.isPresent()) {
watcher.get().addEventListener(eventListener);
}
}
private String getFullPathWithNamespace(final String path) {
......
......@@ -4,9 +4,10 @@ import etcdserverpb.Rpc;
import etcdserverpb.Rpc.WatchResponse;
import io.grpc.stub.StreamObserver;
import io.shardingjdbc.orchestration.reg.base.DataChangedEvent;
import io.shardingjdbc.orchestration.reg.base.EventListener;
import io.shardingjdbc.orchestration.reg.exception.RegException;
import lombok.RequiredArgsConstructor;
import mvccpb.Kv;
import mvccpb.Kv.Event;
/**
* Watch stream observer.
......@@ -16,19 +17,19 @@ import mvccpb.Kv;
@RequiredArgsConstructor
public final class EtcdWatchStreamObserver implements StreamObserver<WatchResponse> {
private final EtcdWatcher etcdWatcher;
private final EventListener eventListener;
@Override
public void onNext(final Rpc.WatchResponse response) {
if (response.getCanceled() || response.getCreated()) {
return;
}
for (Kv.Event event : response.getEventsList()) {
etcdWatcher.notify(new DataChangedEvent(getEventType(event), event.getKv().getKey().toStringUtf8(), event.getKv().getValue().toStringUtf8()));
for (Event event : response.getEventsList()) {
eventListener.onChange(new DataChangedEvent(getEventType(event), event.getKv().getKey().toStringUtf8(), event.getKv().getValue().toStringUtf8()));
}
}
private DataChangedEvent.Type getEventType(final Kv.Event event) {
private DataChangedEvent.Type getEventType(final Event event) {
switch (event.getType()) {
case PUT:
return DataChangedEvent.Type.UPDATED;
......
package io.shardingjdbc.orchestration.reg.etcd.internal.watcher;
import io.shardingjdbc.orchestration.reg.base.DataChangedEvent;
import io.shardingjdbc.orchestration.reg.base.EventListener;
import java.util.ArrayList;
import java.util.List;
/**
* Etcd event watcher.
*
* @author junxiong
*/
public final class EtcdWatcher {
private final List<EventListener> listeners = new ArrayList<>();
/**
* Add watcher listener.
*
* @param eventListener WatcherListener
*/
public void addEventListener(final EventListener eventListener) {
listeners.add(eventListener);
}
/**
* Notify listener when event received.
*
* @param event event
*/
public void notify(final DataChangedEvent event) {
for (EventListener listener : listeners) {
listener.onChange(event);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册