提交 7ab37377 编写于 作者: N Nikita Koksharov

Fixed - exception in org.redisson.jcache.JCacheEventCodec #2588

上级 20023ad0
......@@ -50,8 +50,8 @@ import org.redisson.client.protocol.decoder.MapCacheScanResult;
import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.codec.BaseEventCodec;
import org.redisson.codec.MapCacheEventCodec;
import org.redisson.codec.MapCacheEventCodec.OSType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
......@@ -1882,9 +1882,9 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
serverFuture.syncUninterruptibly();
String os = serverFuture.getNow().get("os");
if (os.contains("Windows")) {
osType = OSType.WINDOWS;
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {
osType = OSType.HPNONSTOP;
osType = BaseEventCodec.OSType.HPNONSTOP;
}
}
......
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import io.netty.buffer.ByteBuf;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
/**
*
* @author Nikita Koksharov
*
*/
public abstract class BaseEventCodec implements Codec {
public enum OSType {WINDOWS, HPNONSTOP}
protected final Codec codec;
protected final OSType osType;
public BaseEventCodec(Codec codec, OSType osType) {
this.codec = codec;
this.osType = osType;
}
protected Object decode(ByteBuf buf, State state, Decoder<?> decoder) throws IOException {
int keyLen;
if (osType == OSType.WINDOWS) {
keyLen = buf.readIntLE();
} else if (osType == OSType.HPNONSTOP) {
keyLen = (int) buf.readLong();
} else {
keyLen = (int) buf.readLongLE();
}
ByteBuf keyBuf = buf.readSlice(keyLen);
Object key = decoder.decode(keyBuf, state);
return key;
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public ClassLoader getClassLoader() {
return getClass().getClassLoader();
}
}
......@@ -15,29 +15,22 @@
*/
package org.redisson.codec;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class MapCacheEventCodec implements Codec {
public class MapCacheEventCodec extends BaseEventCodec {
public enum OSType {WINDOWS, HPNONSTOP}
private final Codec codec;
private final OSType osType;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
......@@ -59,38 +52,19 @@ public class MapCacheEventCodec implements Codec {
};
public MapCacheEventCodec(Codec codec, OSType osType) {
super();
this.codec = codec;
this.osType = osType;
super(codec, osType);
}
public MapCacheEventCodec(ClassLoader classLoader, MapCacheEventCodec codec) {
super(newCodec(classLoader, codec), codec.osType);
}
private static Codec newCodec(ClassLoader classLoader, MapCacheEventCodec codec) {
try {
this.codec = codec.codec.getClass().getConstructor(ClassLoader.class, codec.codec.getClass()).newInstance(classLoader, codec.codec);
return codec.codec.getClass().getConstructor(ClassLoader.class, codec.codec.getClass()).newInstance(classLoader, codec.codec);
} catch (Exception e) {
throw new IllegalStateException(e);
}
this.osType = codec.osType;
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
......@@ -98,57 +72,4 @@ public class MapCacheEventCodec implements Codec {
return decoder;
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
private Object decode(ByteBuf buf, State state, Decoder<?> decoder) throws IOException {
int keyLen;
if (osType == OSType.WINDOWS) {
keyLen = buf.readIntLE();
} else if (osType == OSType.HPNONSTOP) {
keyLen = (int) buf.readLong();
} else {
keyLen = (int) buf.readLongLE();
}
ByteBuf keyBuf = buf.readSlice(keyLen);
Object key = decoder.decode(keyBuf, state);
return key;
}
@Override
public ClassLoader getClassLoader() {
return getClass().getClassLoader();
}
@Override
@SuppressWarnings("AvoidInlineConditionals")
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((codec == null) ? 0 : codec.hashCode());
result = prime * result + ((osType == null) ? 0 : osType.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MapCacheEventCodec other = (MapCacheEventCodec) obj;
if (codec == null) {
if (other.codec != null)
return false;
} else if (!codec.equals(other.codec))
return false;
if (osType != other.osType)
return false;
return true;
}
}
......@@ -15,66 +15,22 @@
*/
package org.redisson.jcache;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriter;
import javax.cache.integration.CacheWriterException;
import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import io.netty.buffer.ByteBuf;
import org.redisson.Redisson;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.RedissonObject;
import org.redisson.ScanResult;
import org.redisson.api.CacheAsync;
import org.redisson.api.CacheReactive;
import org.redisson.api.CacheRx;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.BaseEventCodec;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.jcache.JMutableEntry.Action;
import org.redisson.jcache.configuration.JCacheConfiguration;
import org.redisson.misc.Hash;
......@@ -85,7 +41,25 @@ import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RxProxyBuilder;
import io.netty.buffer.ByteBuf;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.configuration.Factory;
import javax.cache.event.*;
import javax.cache.integration.*;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* JCache implementation
......@@ -3036,7 +3010,20 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
registerCacheEntryListener(cacheEntryListenerConfiguration, true);
}
private JCacheEventCodec.OSType osType;
private void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration, boolean addToConfig) {
if (osType == null) {
RFuture<Map<String, String>> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
serverFuture.syncUninterruptibly();
String os = serverFuture.getNow().get("os");
if (os.contains("Windows")) {
osType = BaseEventCodec.OSType.WINDOWS;
} else if (os.contains("NONSTOP")) {
osType = BaseEventCodec.OSType.HPNONSTOP;
}
}
Factory<CacheEntryListener<? super K, ? super V>> factory = cacheEntryListenerConfiguration.getCacheEntryListenerFactory();
final CacheEntryListener<? super K, ? super V> listener = factory.create();
......@@ -3063,7 +3050,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getRemovedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync));
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
......@@ -3086,7 +3073,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getCreatedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync));
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
......@@ -3109,7 +3096,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getUpdatedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync));
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
......@@ -3129,7 +3116,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (CacheEntryExpiredListener.class.isAssignableFrom(listener.getClass())) {
String channelName = getExpiredChannelName();
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, false));
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, false));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
......
......@@ -15,50 +15,34 @@
*/
package org.redisson.jcache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.codec.BaseEventCodec;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class JCacheEventCodec implements Codec {
public class JCacheEventCodec extends BaseEventCodec {
private final Codec codec;
private final boolean sync;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
List<Object> result = new ArrayList<Object>();
int keyLen;
if (PlatformDependent.isWindows()) {
keyLen = buf.readIntLE();
} else {
keyLen = (int) buf.readLongLE();
}
ByteBuf keyBuf = buf.readSlice(keyLen);
Object key = codec.getMapKeyDecoder().decode(keyBuf, state);
List<Object> result = new ArrayList<>();
Object key = JCacheEventCodec.this.decode(buf, state, codec.getMapKeyDecoder());
result.add(key);
int valueLen;
if (PlatformDependent.isWindows()) {
valueLen = buf.readIntLE();
} else {
valueLen = (int) buf.readLongLE();
}
ByteBuf valueBuf = buf.readSlice(valueLen);
Object value = codec.getMapValueDecoder().decode(valueBuf, state);
Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(value);
if (sync) {
......@@ -70,45 +54,14 @@ public class JCacheEventCodec implements Codec {
}
};
public JCacheEventCodec(Codec codec, boolean sync) {
super();
this.codec = codec;
public JCacheEventCodec(Codec codec, OSType osType, boolean sync) {
super(codec, osType);
this.sync = sync;
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public ClassLoader getClassLoader() {
return getClass().getClassLoader();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册