diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index c8e6fe9d7caf800022a467d91971fcdb50974cad..9b37e90b38451ad51ba42e00caeaa30bf82cc0e5 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -50,8 +50,8 @@ import org.redisson.client.protocol.decoder.MapCacheScanResult; import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ObjectMapDecoder; +import org.redisson.codec.BaseEventCodec; import org.redisson.codec.MapCacheEventCodec; -import org.redisson.codec.MapCacheEventCodec.OSType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.eviction.EvictionScheduler; @@ -1882,9 +1882,9 @@ public class RedissonMapCache extends RedissonMap implements RMapCac serverFuture.syncUninterruptibly(); String os = serverFuture.getNow().get("os"); if (os.contains("Windows")) { - osType = OSType.WINDOWS; + osType = BaseEventCodec.OSType.WINDOWS; } else if (os.contains("NONSTOP")) { - osType = OSType.HPNONSTOP; + osType = BaseEventCodec.OSType.HPNONSTOP; } } diff --git a/redisson/src/main/java/org/redisson/codec/BaseEventCodec.java b/redisson/src/main/java/org/redisson/codec/BaseEventCodec.java new file mode 100644 index 0000000000000000000000000000000000000000..ea9c9b8d20e7c6a49010f5b6be29448c12f8d72d --- /dev/null +++ b/redisson/src/main/java/org/redisson/codec/BaseEventCodec.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2013-2020 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.codec; + +import io.netty.buffer.ByteBuf; +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + +import java.io.IOException; + +/** + * + * @author Nikita Koksharov + * + */ +public abstract class BaseEventCodec implements Codec { + + public enum OSType {WINDOWS, HPNONSTOP} + + protected final Codec codec; + protected final OSType osType; + + public BaseEventCodec(Codec codec, OSType osType) { + this.codec = codec; + this.osType = osType; + } + + protected Object decode(ByteBuf buf, State state, Decoder decoder) throws IOException { + int keyLen; + if (osType == OSType.WINDOWS) { + keyLen = buf.readIntLE(); + } else if (osType == OSType.HPNONSTOP) { + keyLen = (int) buf.readLong(); + } else { + keyLen = (int) buf.readLongLE(); + } + ByteBuf keyBuf = buf.readSlice(keyLen); + Object key = decoder.decode(keyBuf, state); + return key; + } + + @Override + public Decoder getMapValueDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getMapKeyDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapKeyEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public ClassLoader getClassLoader() { + return getClass().getClassLoader(); + } + + +} diff --git a/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java index c800fd663c899352a76662c2b6ea903c0b5930e5..a39c048ed6a80f7322cb3e7a9711a58547dccb34 100644 --- a/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java @@ -15,29 +15,22 @@ */ package org.redisson.codec; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - +import io.netty.buffer.ByteBuf; import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; -import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * * @author Nikita Koksharov * */ -public class MapCacheEventCodec implements Codec { +public class MapCacheEventCodec extends BaseEventCodec { - public enum OSType {WINDOWS, HPNONSTOP} - - private final Codec codec; - private final OSType osType; - private final Decoder decoder = new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { @@ -59,38 +52,19 @@ public class MapCacheEventCodec implements Codec { }; public MapCacheEventCodec(Codec codec, OSType osType) { - super(); - this.codec = codec; - this.osType = osType; + super(codec, osType); } public MapCacheEventCodec(ClassLoader classLoader, MapCacheEventCodec codec) { + super(newCodec(classLoader, codec), codec.osType); + } + + private static Codec newCodec(ClassLoader classLoader, MapCacheEventCodec codec) { try { - this.codec = codec.codec.getClass().getConstructor(ClassLoader.class, codec.codec.getClass()).newInstance(classLoader, codec.codec); + return codec.codec.getClass().getConstructor(ClassLoader.class, codec.codec.getClass()).newInstance(classLoader, codec.codec); } catch (Exception e) { throw new IllegalStateException(e); } - this.osType = codec.osType; - } - - @Override - public Decoder getMapValueDecoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Encoder getMapValueEncoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Decoder getMapKeyDecoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Encoder getMapKeyEncoder() { - throw new UnsupportedOperationException(); } @Override @@ -98,57 +72,4 @@ public class MapCacheEventCodec implements Codec { return decoder; } - @Override - public Encoder getValueEncoder() { - throw new UnsupportedOperationException(); - } - - private Object decode(ByteBuf buf, State state, Decoder decoder) throws IOException { - int keyLen; - if (osType == OSType.WINDOWS) { - keyLen = buf.readIntLE(); - } else if (osType == OSType.HPNONSTOP) { - keyLen = (int) buf.readLong(); - } else { - keyLen = (int) buf.readLongLE(); - } - ByteBuf keyBuf = buf.readSlice(keyLen); - Object key = decoder.decode(keyBuf, state); - return key; - } - - @Override - public ClassLoader getClassLoader() { - return getClass().getClassLoader(); - } - - @Override - @SuppressWarnings("AvoidInlineConditionals") - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((codec == null) ? 0 : codec.hashCode()); - result = prime * result + ((osType == null) ? 0 : osType.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - MapCacheEventCodec other = (MapCacheEventCodec) obj; - if (codec == null) { - if (other.codec != null) - return false; - } else if (!codec.equals(other.codec)) - return false; - if (osType != other.osType) - return false; - return true; - } - } diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 243228805991c54c71f4d6d3b5f71ae099e22d13..64e3ee322b36e8fe8c7171aee715114f31a1ea25 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -15,66 +15,22 @@ */ package org.redisson.jcache; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; - -import javax.cache.Cache; -import javax.cache.CacheException; -import javax.cache.CacheManager; -import javax.cache.configuration.CacheEntryListenerConfiguration; -import javax.cache.configuration.Configuration; -import javax.cache.configuration.Factory; -import javax.cache.event.CacheEntryCreatedListener; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryEventFilter; -import javax.cache.event.CacheEntryExpiredListener; -import javax.cache.event.CacheEntryListener; -import javax.cache.event.CacheEntryRemovedListener; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.event.EventType; -import javax.cache.integration.CacheLoader; -import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriter; -import javax.cache.integration.CacheWriterException; -import javax.cache.integration.CompletionListener; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; - +import io.netty.buffer.ByteBuf; import org.redisson.Redisson; -import org.redisson.iterator.RedissonBaseMapIterator; import org.redisson.RedissonObject; import org.redisson.ScanResult; -import org.redisson.api.CacheAsync; -import org.redisson.api.CacheReactive; -import org.redisson.api.CacheRx; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RSemaphore; -import org.redisson.api.RTopic; +import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.codec.BaseEventCodec; import org.redisson.connection.decoder.MapGetAllDecoder; +import org.redisson.iterator.RedissonBaseMapIterator; import org.redisson.jcache.JMutableEntry.Action; import org.redisson.jcache.configuration.JCacheConfiguration; import org.redisson.misc.Hash; @@ -85,7 +41,25 @@ import org.redisson.reactive.ReactiveProxyBuilder; import org.redisson.rx.CommandRxService; import org.redisson.rx.RxProxyBuilder; -import io.netty.buffer.ByteBuf; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.CacheManager; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Configuration; +import javax.cache.configuration.Factory; +import javax.cache.event.*; +import javax.cache.integration.*; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; /** * JCache implementation @@ -3036,7 +3010,20 @@ public class JCache extends RedissonObject implements Cache, CacheAs registerCacheEntryListener(cacheEntryListenerConfiguration, true); } + private JCacheEventCodec.OSType osType; + private void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration, boolean addToConfig) { + if (osType == null) { + RFuture> serverFuture = commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); + serverFuture.syncUninterruptibly(); + String os = serverFuture.getNow().get("os"); + if (os.contains("Windows")) { + osType = BaseEventCodec.OSType.WINDOWS; + } else if (os.contains("NONSTOP")) { + osType = BaseEventCodec.OSType.HPNONSTOP; + } + } + Factory> factory = cacheEntryListenerConfiguration.getCacheEntryListenerFactory(); final CacheEntryListener listener = factory.create(); @@ -3063,7 +3050,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs channelName = getRemovedSyncChannelName(); } - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync)); + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { @@ -3086,7 +3073,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs channelName = getCreatedSyncChannelName(); } - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync)); + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { @@ -3109,7 +3096,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs channelName = getUpdatedSyncChannelName(); } - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync)); + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { @@ -3129,7 +3116,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (CacheEntryExpiredListener.class.isAssignableFrom(listener.getClass())) { String channelName = getExpiredChannelName(); - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, false)); + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, false)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java index 792a5812795e7d4da083c8356ed9175a87e2ded5..544fab8d3bfdddcbd4c91c6b7cbd8f72788fe72c 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java @@ -15,50 +15,34 @@ */ package org.redisson.jcache; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - +import io.netty.buffer.ByteBuf; import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; +import org.redisson.codec.BaseEventCodec; -import io.netty.buffer.ByteBuf; -import io.netty.util.internal.PlatformDependent; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * * @author Nikita Koksharov * */ -public class JCacheEventCodec implements Codec { +public class JCacheEventCodec extends BaseEventCodec { - private final Codec codec; private final boolean sync; private final Decoder decoder = new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { - List result = new ArrayList(); - int keyLen; - if (PlatformDependent.isWindows()) { - keyLen = buf.readIntLE(); - } else { - keyLen = (int) buf.readLongLE(); - } - ByteBuf keyBuf = buf.readSlice(keyLen); - Object key = codec.getMapKeyDecoder().decode(keyBuf, state); + List result = new ArrayList<>(); + + Object key = JCacheEventCodec.this.decode(buf, state, codec.getMapKeyDecoder()); result.add(key); - int valueLen; - if (PlatformDependent.isWindows()) { - valueLen = buf.readIntLE(); - } else { - valueLen = (int) buf.readLongLE(); - } - ByteBuf valueBuf = buf.readSlice(valueLen); - Object value = codec.getMapValueDecoder().decode(valueBuf, state); + Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); result.add(value); if (sync) { @@ -70,45 +54,14 @@ public class JCacheEventCodec implements Codec { } }; - public JCacheEventCodec(Codec codec, boolean sync) { - super(); - this.codec = codec; + public JCacheEventCodec(Codec codec, OSType osType, boolean sync) { + super(codec, osType); this.sync = sync; } - @Override - public Decoder getMapValueDecoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Encoder getMapValueEncoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Decoder getMapKeyDecoder() { - throw new UnsupportedOperationException(); - } - - @Override - public Encoder getMapKeyEncoder() { - throw new UnsupportedOperationException(); - } - @Override public Decoder getValueDecoder() { return decoder; } - @Override - public Encoder getValueEncoder() { - throw new UnsupportedOperationException(); - } - - @Override - public ClassLoader getClassLoader() { - return getClass().getClassLoader(); - } - }