提交 8b0a2e84 编写于 作者: R Rajan 提交者: rdhabalia

Avoid using zk-client thread to process zk-data result (#361)

* Avoid using zk-client thread to process zk-data result

* add ThreadPool-name and stop executors on shutdown

* shutdown zkCache executor if zkCache has initialized it
上级 e0d2ecf0
......@@ -22,7 +22,6 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
......@@ -61,18 +60,24 @@ public class MockZooKeeper extends ZooKeeper {
private KeeperException.Code failReturnCode;
private Watcher sessionWatcher;
private long sessionId = 0L;
private int readOpDelayMs;
public static MockZooKeeper newInstance() {
return newInstance(null);
}
public static MockZooKeeper newInstance(ExecutorService executor) {
return newInstance(executor, -1);
}
public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) {
try {
ReflectionFactory rf = ReflectionFactory.getReflectionFactory();
Constructor objDef = Object.class.getDeclaredConstructor(new Class[0]);
Constructor intConstr = rf.newConstructorForSerialization(MockZooKeeper.class, objDef);
MockZooKeeper zk = MockZooKeeper.class.cast(intConstr.newInstance());
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
return zk;
} catch (RuntimeException e) {
throw e;
......@@ -192,7 +197,6 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public synchronized byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
checkProgrammedFail();
Pair<String, Integer> value = tree.get(path);
if (value == null) {
throw new KeeperException.NoNodeException(path);
......@@ -210,6 +214,7 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
if (getProgrammedFailStatus()) {
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
return;
......@@ -236,6 +241,7 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
synchronized (MockZooKeeper.this) {
if (getProgrammedFailStatus()) {
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
......@@ -719,5 +725,15 @@ public class MockZooKeeper extends ZooKeeper {
return "MockZookeeper";
}
private void checkReadOpDelay() {
if (readOpDelayMs > 0) {
try {
Thread.sleep(readOpDelayMs);
} catch (InterruptedException e) {
// Ok
}
}
}
private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class);
}
......@@ -25,4 +25,5 @@ import org.apache.zookeeper.ZooKeeper;
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException;
void close();
}
......@@ -31,6 +31,9 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private ZooKeeperCache rackawarePolicyZkCache;
private ZooKeeperCache clientIsolationZkCache;
@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
ClientConfiguration bkConf = new ClientConfiguration();
......@@ -61,8 +64,9 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(zkClient) {
});
this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
};
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
}
if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
......@@ -70,8 +74,9 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(zkClient) {
});
this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
};
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
}
}
......@@ -81,4 +86,13 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
throw new IOException(e);
}
}
public void close() {
if (this.rackawarePolicyZkCache != null) {
this.rackawarePolicyZkCache.stop();
}
if (this.clientIsolationZkCache != null) {
this.clientIsolationZkCache.stop();
}
}
}
......@@ -89,6 +89,7 @@ public class PulsarService implements AutoCloseable {
private WebSocketService webSocketService = null;
private ConfigurationCacheService configurationCacheService = null;
private LocalZooKeeperCacheService localZkCacheService = null;
private BookKeeperClientFactory bkClientFactory;
private ZooKeeperCache localZkCache;
private GlobalZooKeeperCache globalZkCache;
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
......@@ -163,6 +164,11 @@ public class PulsarService implements AutoCloseable {
this.managedLedgerClientFactory = null;
}
if (bkClientFactory != null) {
this.bkClientFactory.close();
this.bkClientFactory = null;
}
if (this.leaderElectionService != null) {
this.leaderElectionService.stop();
this.leaderElectionService = null;
......@@ -235,8 +241,8 @@ public class PulsarService implements AutoCloseable {
// Initialize and start service to access configuration repository.
this.startZkCacheService();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(),
getBookKeeperClientFactory());
this.bkClientFactory = getBookKeeperClientFactory();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
this.brokerService = new BrokerService(this);
......@@ -375,7 +381,7 @@ public class PulsarService implements AutoCloseable {
LOG.info("starting configuration cache service");
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.executor);
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.executor);
......
......@@ -216,5 +216,10 @@ public abstract class MockedPulsarServiceBaseTest {
// Always return the same instance (so that we don't loose the mock BK content on broker restart
return mockBookKeeper;
}
@Override
public void close() {
// no-op
}
};
}
......@@ -46,7 +46,7 @@ public class ResourceQuotaCacheTest {
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
......
......@@ -68,7 +68,7 @@ public class OwnershipCacheTest {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
......
......@@ -65,7 +65,8 @@ public class ZookeeperCacheLoader implements Closeable {
log.error("Shutting down ZK sessions: {}", exitCode);
});
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor,
null/* cache uses ForkJoinPool if provided scheduler is null to load data-async */);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
......
......@@ -50,7 +50,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
String globalZkConnect, OrderedSafeExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) {
super(null, orderedExecutor);
super(null, orderedExecutor, scheduledExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
......
......@@ -15,6 +15,8 @@
*/
package com.yahoo.pulsar.zookeeper;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
......@@ -32,8 +34,9 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class);
public LocalZooKeeperCache(final ZooKeeper zk, final OrderedSafeExecutor executor) {
super(zk, executor);
public LocalZooKeeperCache(final ZooKeeper zk, final OrderedSafeExecutor executor,
ScheduledExecutorService scheduledExecutor) {
super(zk, executor, scheduledExecutor);
}
@Override
......
......@@ -17,14 +17,17 @@ package com.yahoo.pulsar.zookeeper;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
......@@ -46,6 +49,8 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Per ZK client ZooKeeper cache supporting ZNode data and children list caches. A cache entry is identified, accessed
* and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given
......@@ -73,13 +78,16 @@ public abstract class ZooKeeperCache implements Watcher {
protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final Cache<String, Set<String>> childrenCache;
protected final Cache<String, Boolean> existsCache;
protected final OrderedSafeExecutor executor;
private final OrderedSafeExecutor executor;
private final ScheduledExecutorService scheduledExecutor;
private boolean shouldShutdownExecutor = false;
public static final int cacheTimeOutInSec = 30;
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor) {
public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);
this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
......@@ -90,7 +98,9 @@ public abstract class ZooKeeperCache implements Watcher {
}
public ZooKeeperCache(ZooKeeper zkSession) {
this(zkSession, new OrderedSafeExecutor(1, "zk-cache-executor"));
this(zkSession, new OrderedSafeExecutor(1, "zk-cache-executor"),
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("zk-cache-callback-executor")));
this.shouldShutdownExecutor = true;
}
public ZooKeeper getZooKeeper() {
......@@ -266,18 +276,20 @@ public abstract class ZooKeeperCache implements Watcher {
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
zkFuture.complete(new AbstractMap.SimpleImmutableEntry<Object, Stat>(obj, stat));
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
zkFuture.completeExceptionally(e);
exec.execute(() -> zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
zkFuture.complete(null);
exec.execute(() -> zkFuture.complete(null));
} else {
zkFuture.completeExceptionally(KeeperException.create(rc));
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);
......@@ -364,4 +376,11 @@ public abstract class ZooKeeperCache implements Watcher {
}
}
}
public void stop() {
if (shouldShutdownExecutor) {
this.executor.shutdown();
this.scheduledExecutor.shutdown();
}
}
}
......@@ -77,7 +77,7 @@ public class ZkBookieRackAffinityMappingTest {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......@@ -104,7 +104,7 @@ public class ZkBookieRackAffinityMappingTest {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......@@ -158,7 +158,7 @@ public class ZkBookieRackAffinityMappingTest {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......
......@@ -106,7 +106,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
......@@ -176,7 +176,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
public void testNoBookieInfo() throws Exception {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
......@@ -296,7 +296,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
isolationPolicy.initialize(bkClientConf);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
......
......@@ -23,9 +23,14 @@ import static org.testng.AssertJUnit.assertNull;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
......@@ -44,6 +49,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
@Test
public class ZookeeperCacheTest {
private MockZooKeeper zkClient;
......@@ -60,7 +67,7 @@ public class ZookeeperCacheTest {
@Test
void testSimpleCache() throws Exception {
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executors in unit test */);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null, null /* no executors in unit test */);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
......@@ -101,7 +108,7 @@ public class ZookeeperCacheTest {
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
zkClient.create("/test", new byte[0], null, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, null);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
......@@ -154,7 +161,7 @@ public class ZookeeperCacheTest {
// Check existence after creation of the node
zkClient.create("/test", new byte[0], null, null);
Thread.sleep(20);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
......@@ -171,7 +178,7 @@ public class ZookeeperCacheTest {
zkClient.create("/test/c1", new byte[0], null, null);
zkClient.create("/test/c2", new byte[0], null, null);
Thread.sleep(20);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
......@@ -301,4 +308,46 @@ public class ZookeeperCacheTest {
// Update shouldn't happen after the last check
assertEquals(notificationCount.get(), 1);
}
/**
* Verifies that blocking call on zkCache-callback will not introduce deadlock because zkCache completes
* future-result with different thread than zookeeper-client thread.
*
* @throws Exception
*/
@Test(timeOut = 2000)
void testZkCallbackThreadStuck() throws Exception {
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executors in unit test */, null);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};
String value = "test";
String key = "/" + UUID.randomUUID().toString().substring(0, 8);
String key1 = "/" + UUID.randomUUID().toString().substring(0, 8);
String key2 = "/" + UUID.randomUUID().toString().substring(0, 8);
zkClient.create(key, value.getBytes(), null, null);
zkClient.create(key1, value.getBytes(), null, null);
zkClient.create(key2, value.getBytes(), null, null);
CountDownLatch latch = new CountDownLatch(1);
zkCache.getAsync(key).thenAccept(val -> {
try {
zkCache.get(key1);
} catch (Exception e) {
fail("failed to get " + key2, e);
}
latch.countDown();
});
latch.await();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册