未验证 提交 a6afcac8 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Logically revert #6642 and partial #7153 to reduce unnecessary thread and...

Logically revert #6642 and partial #7153 to reduce unnecessary thread and concurrency process (#7318)

 The key logic behinds all these is, metrics persistence is fully asynchronous.

* The core/maxSyncOperationNum setting(added in 8.5.0) is removed due to metrics persistence is fully asynchronous.
* The core/syncThreads setting(added in 8.5.0) is removed due to metrics persistence is fully asynchronous.
* Optimization: Concurrency mode of execution stage for metrics is removed(added in 8.5.0). The only concurrency of prepare stage is meaningful and kept.
* Remove the outside preparedRequest list initialization, worker instance could always build a suitable size list in the first place (Reduce Array.copy and GC load a little).
上级 9f3ff517
......@@ -99,6 +99,10 @@ Release Notes.
in low traffic(traffic < bulkActions in the whole period), there is a possible case, 2 period bulks are included in
one index refresh rebuild operation, which could cause version conflicts. And this case can't be fixed
through `core/persistentPeriod` as the bulk fresh is not controlled by the persistent timer anymore.
* The `core/maxSyncOperationNum` setting(added in 8.5.0) is removed due to metrics persistence is fully asynchronous.
* The `core/syncThreads` setting(added in 8.5.0) is removed due to metrics persistence is fully asynchronous.
* Optimization: Concurrency mode of execution stage for metrics is removed(added in 8.5.0). Only concurrency of prepare
stage is meaningful and kept.
#### UI
......
......@@ -44,9 +44,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | maxSizeOfNetworkAddressAlias|Max size of network address detected in the be monitored system.| - | 1_000_000|
| - | - | maxPageSizeOfQueryProfileSnapshot|The max size in every OAP query for snapshot analysis| - | 500 |
| - | - | maxSizeOfAnalyzeProfileSnapshot|The max number of snapshots analyzed by OAP| - | 12000 |
| - | - | syncThreads|The number of threads used to synchronously refresh the metrics data to the storage.| SW_CORE_SYNC_THREADS | 2 |
| - | - | prepareThreads|The number of threads used to prepare metrics data to the storage.| SW_CORE_PREPARE_THREADS | 2 |
| - | - | maxSyncOperationNum|The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.| SW_CORE_MAX_SYNC_OPERATION_NUM | 50000 |
| - | - | enableEndpointNameGroupingByOpenapi |Turn it on then automatically grouping endpoint by the given OpenAPI definitions.| SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPAENAPI | true |
|cluster|standalone| - | standalone is not suitable for one node running, no available configuration.| - | - |
| - | zookeeper|nameSpace|The namespace, represented by root path, isolates the configurations in the zookeeper.|SW_NAMESPACE| `/`, root path|
......
......@@ -113,12 +113,8 @@ core:
searchableLogsTags: ${SW_SEARCHABLE_LOGS_TAG_KEYS:level}
# Define the set of alarm tag keys, which should be searchable through the GraphQL.
searchableAlarmTags: ${SW_SEARCHABLE_ALARM_TAG_KEYS:level}
# The number of threads used to synchronously refresh the metrics data to the storage.
syncThreads: ${SW_CORE_SYNC_THREADS:2}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${SW_CORE_PREPARE_THREADS:2}
# The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.
maxSyncOperationNum: ${SW_CORE_MAX_SYNC_OPERATION_NUM:50000}
# Turn it on then automatically grouping endpoint by the given OpenAPI definitions.
enableEndpointNameGroupingByOpenapi: ${SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPAENAPI:true}
storage:
......
......@@ -152,14 +152,6 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter
@Getter
private String searchableAlarmTags = "";
/**
* The number of threads used to synchronously refresh the metrics data to the storage.
*
* @since 8.5.0
*/
@Setter
@Getter
private int syncThreads = 2;
/**
* The number of threads used to prepare metrics data to the storage.
......@@ -170,14 +162,6 @@ public class CoreModuleConfig extends ModuleConfig {
@Getter
private int prepareThreads = 2;
/**
* The maximum number of processes supported for each synchronous storage operation. When the number of the flush
* data is greater than this value, it will be assigned to multiple cores for execution.
*/
@Getter
@Setter
private int maxSyncOperationNum = 50000;
@Getter
@Setter
private boolean enableEndpointNameGroupingByOpenapi = true;
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
......@@ -148,22 +149,23 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
}
@Override
public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
public List<PrepareRequest> prepareBatch(Collection<Metrics> lastCollection) {
if (persistentCounter++ % persistentMod != 0) {
return;
return Collections.EMPTY_LIST;
}
long start = System.currentTimeMillis();
if (lastCollection.size() == 0) {
return;
return Collections.EMPTY_LIST;
}
/*
* Hard coded the max size. This is only the batch size of one metrics, too large number is meaningless.
* Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache.
*/
int maxBatchGetSize = 2000;
final int batchSize = Math.min(maxBatchGetSize, lastCollection.size());
List<Metrics> metricsList = new ArrayList<>();
List<PrepareRequest> prepareRequests = new ArrayList<>(lastCollection.size());
for (Metrics data : lastCollection) {
transWorker.ifPresent(metricsTransWorker -> metricsTransWorker.in(data));
......@@ -184,6 +186,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
System.currentTimeMillis() - start, prepareRequests.size()
);
}
return prepareRequests;
}
private void flushDataToStorage(List<Metrics> metricsList,
......
......@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* MetricsTransWorker is transferring the metrics for downsampling. All streaming process metrics are in the minute
* MetricsTransWorker is transferring the metrics for down sampling. All streaming process metrics are in the minute
* precision, but at the storage layer, in order to enhance the query performance, metrics could be saved in minute,
* hour, day and month, including some of them through CoreModuleConfig#downsampling.
*/
......
......@@ -63,12 +63,11 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
* implementations.
*
* @param lastCollection the source of transformation, they are in memory object format.
* @param prepareRequests data in the formats for the final persistence operations.
*/
public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
public abstract List<PrepareRequest> prepareBatch(Collection<INPUT> lastCollection);
public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
public List<PrepareRequest> buildBatchRequests() {
final List<INPUT> dataList = getCache().read();
prepareBatch(dataList, prepareRequests);
return prepareBatch(dataList);
}
}
......@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
......@@ -61,18 +63,19 @@ public class TopNWorker extends PersistenceWorker<TopN> {
* Force overriding the parent buildBatchRequests. Use its own report period.
*/
@Override
public void buildBatchRequests(final List<PrepareRequest> prepareRequests) {
public List<PrepareRequest> buildBatchRequests() {
long now = System.currentTimeMillis();
if (now - lastReportTimestamp <= reportPeriod) {
// Only do report in its own report period.
return;
return Collections.EMPTY_LIST;
}
lastReportTimestamp = now;
super.buildBatchRequests(prepareRequests);
return super.buildBatchRequests();
}
@Override
public void prepareBatch(Collection<TopN> lastCollection, List<PrepareRequest> prepareRequests) {
public List<PrepareRequest> prepareBatch(Collection<TopN> lastCollection) {
List<PrepareRequest> prepareRequests = new ArrayList<>(lastCollection.size());
lastCollection.forEach(record -> {
try {
prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
......@@ -80,6 +83,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
log.error(t.getMessage(), t);
}
});
return prepareRequests;
}
/**
......
......@@ -23,8 +23,8 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
/**
* IBatchDAO provides two modes of data persistence supported by most databases, including pure insert and
* batch hybrid insert/update.
* IBatchDAO provides two modes of data persistence supported by most databases, including pure insert and batch hybrid
* insert/update.
*/
public interface IBatchDAO extends DAO {
/**
......@@ -39,6 +39,8 @@ public interface IBatchDAO extends DAO {
* Push data collection into the database in async mode. This method is driven by streaming process. This method
* doesn't request the data queryable immediately after the method finished.
*
* The method requires thread safe. The OAP core would call this concurrently.
*
* @param prepareRequests data to insert or update. No delete happens in streaming mode.
*/
void flush(List<PrepareRequest> prepareRequests);
......
......@@ -20,15 +20,11 @@ package org.apache.skywalking.oap.server.core.storage;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
......@@ -49,18 +45,13 @@ public enum PersistenceTimer {
INSTANCE;
@VisibleForTesting
boolean isStarted = false;
private final Boolean debug;
private CounterMetrics errorCounter;
private HistogramMetrics prepareLatency;
private HistogramMetrics executeLatency;
private HistogramMetrics allLatency;
private int syncOperationThreadsNum;
private int maxSyncOperationNum;
private ExecutorService executorService;
private ExecutorService prepareExecutorService;
PersistenceTimer() {
this.debug = System.getProperty("debug") != null;
}
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
......@@ -87,9 +78,6 @@ public enum PersistenceTimer {
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
syncOperationThreadsNum = moduleConfig.getSyncThreads();
maxSyncOperationNum = moduleConfig.getMaxSyncOperationNum();
executorService = Executors.newFixedThreadPool(syncOperationThreadsNum);
prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
......@@ -104,167 +92,59 @@ public enum PersistenceTimer {
}
private void extractDataAndSave(IBatchDAO batchDAO) {
if (log.isDebugEnabled()) {
log.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
HistogramMetrics.Timer allTimer = allLatency.createTimer();
// Use `stop` as a control signal to make fail-fast in the persistence process.
AtomicBoolean stop = new AtomicBoolean(false);
DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue(
this.maxSyncOperationNum);
try {
try (HistogramMetrics.Timer allTimer = allLatency.createTimer()) {
List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
// CountDownLatch makes sure all prepare threads done eventually.
CountDownLatch prepareStageCountDownLatch = new CountDownLatch(persistenceWorkers.size());
CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
persistenceWorkers.forEach(worker -> {
prepareExecutorService.submit(() -> {
if (stop.get()) {
prepareStageCountDownLatch.countDown();
return;
}
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List<PrepareRequest> innerPrepareRequests = null;
try {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
List<PrepareRequest> innerPrepareRequests = new ArrayList<>(5000);
worker.buildBatchRequests(innerPrepareRequests);
// Push the prepared requests into DefaultBlockingBatchQueue,
// the executorService consumes from it when it reaches the size of batch.
prepareQueue.offer(innerPrepareRequests);
worker.endOfRound();
} finally {
timer.finish();
prepareStageCountDownLatch.countDown();
}
});
});
// Prepare stage
try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
innerPrepareRequests = worker.buildBatchRequests();
List<Future<?>> batchFutures = new ArrayList<>();
for (int i = 0; i < syncOperationThreadsNum; i++) {
Future<?> batchFuture = executorService.submit(() -> {
// consume the metrics
while (!stop.get()) {
List<PrepareRequest> partition = prepareQueue.poll();
if (partition.isEmpty()) {
break;
worker.endOfRound();
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(partition)) {
batchDAO.flush(partition);
// Execution stage
try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
batchDAO.flush(innerPrepareRequests);
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executeLatencyTimer.finish();
}
} finally {
countDownLatch.countDown();
}
return null;
});
batchFutures.add(batchFuture);
}
// Wait for prepare stage is done.
prepareStageCountDownLatch.await();
prepareQueue.noFurtherAppending();
// Wait for batch stage is done.
for (Future<?> result : batchFutures) {
result.get();
}
});
countDownLatch.await();
} catch (Throwable e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
if (log.isDebugEnabled()) {
log.debug("Persistence data save finish");
}
stop.set(true);
allTimer.finish();
}
if (debug) {
log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
static class DefaultBlockingBatchQueue<E> implements BlockingBatchQueue<E> {
@Getter
private final int maxBatchSize;
private final List<E> elementData;
@Getter
private boolean inAppendingMode = true;
public DefaultBlockingBatchQueue(final int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
// Use the maxBatchSize * 3 as the initial queue size to avoid ArrayList#grow
this.elementData = new ArrayList<>(maxBatchSize * 3);
}
@Override
public void offer(List<E> elements) {
synchronized (elementData) {
if (!inAppendingMode) {
throw new IllegalStateException();
}
elementData.addAll(elements);
if (elementData.size() >= maxBatchSize) {
elementData.notifyAll();
}
}
}
@Override
public List<E> poll() throws InterruptedException {
synchronized (elementData) {
while (this.elementData.size() < maxBatchSize && inAppendingMode) {
elementData.wait(1000);
}
if (CollectionUtils.isEmpty(elementData)) {
return Collections.EMPTY_LIST;
}
List<E> sublist = this.elementData.subList(
0, Math.min(maxBatchSize, this.elementData.size()));
List<E> partition = new ArrayList<>(sublist);
sublist.clear();
return partition;
}
}
@Override
public void noFurtherAppending() {
synchronized (elementData) {
inAppendingMode = false;
elementData.notifyAll();
}
}
@Override
public void furtherAppending() {
synchronized (elementData) {
inAppendingMode = true;
elementData.notifyAll();
}
}
@Override
public int size() {
synchronized (elementData) {
return elementData.size();
}
}
log.debug("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@Fork(2)
public class BlockingBatchQueueBenchmark {
@State(Scope.Benchmark)
public static class MyState {
int count = 10_000_000;
PersistenceTimer.DefaultBlockingBatchQueue blockingBatchQueueWithSynchronized = new PersistenceTimer.DefaultBlockingBatchQueue(
50000);
BlockingBatchQueueWithLinkedBlockingQueue blockingBatchQueueWithLinkedBlockingQueue = new BlockingBatchQueueWithLinkedBlockingQueue(
50000);
BlockingBatchQueueWithReentrantLock blockingBatchQueueWithReentrantLock = new BlockingBatchQueueWithReentrantLock(
50000);
List<Integer> willAdd = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
int producerCount = 10;
int consumerCount = 2;
int producerLength = count / producerCount / 1000;
ExecutorService producer;
ExecutorService consumer;
@Setup(Level.Invocation)
public void before() {
producer = Executors.newFixedThreadPool(producerCount);
consumer = Executors.newFixedThreadPool(consumerCount);
}
@TearDown(Level.Invocation)
public void after() {
producer.shutdown();
consumer.shutdown();
}
}
@Benchmark
public void testSynchronized(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithSynchronized);
}
@Benchmark
public void testReentrantLock(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithReentrantLock);
}
@Benchmark
public void testLinkedBlockingQueue(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithLinkedBlockingQueue);
}
private void testProductAndConsume(final MyState myState,
BlockingBatchQueue queue) throws InterruptedException, ExecutionException {
queue.furtherAppending();
CountDownLatch latch = new CountDownLatch(myState.producerCount);
for (int i = 0; i < myState.producerCount; i++) {
myState.producer.submit(() -> {
for (int j = 0; j < myState.producerLength; j++) {
queue.offer(myState.willAdd);
}
latch.countDown();
return null;
});
}
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < myState.consumerCount; i++) {
Future<?> submit = myState.consumer.submit(() -> {
while (!queue.poll().isEmpty()) {
}
return null;
});
futures.add(submit);
}
latch.await();
queue.noFurtherAppending();
for (Future<?> future : futures) {
future.get();
}
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(BlockingBatchQueueBenchmark.class.getSimpleName())
.forks(2)
.build();
new Runner(opt).run();
}
/**
* # JMH version: 1.21
* # VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11
* # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java
* # VM options: -javaagent:/Users/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=50386:/Users/alvin/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8
* # Warmup: 5 iterations, 10 s each
* # Measurement: 5 iterations, 10 s each
* # Timeout: 10 min per iteration
* # Threads: 1 thread, will synchronize iterations
* # Benchmark mode: Throughput, ops/time
*
* Benchmark Mode Cnt Score Error Units
* BlockingBatchQueueBenchmark.testLinkedBlockingQueue thrpt 10 0.317 ± 0.032 ops/s
* BlockingBatchQueueBenchmark.testReentrantLock thrpt 10 16.018 ± 1.553 ops/s
* BlockingBatchQueueBenchmark.testSynchronized thrpt 10 16.769 ± 0.533 ops/s
*
*/
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
......@@ -39,7 +40,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
......@@ -53,7 +53,6 @@ public class PersistenceTimerTest {
int count = 101;
int workCount = 10;
CoreModuleConfig moduleConfig = new CoreModuleConfig();
moduleConfig.setMaxSyncOperationNum(5);
moduleConfig.setPersistentPeriod(Integer.MAX_VALUE);
IBatchDAO iBatchDAO = new IBatchDAO() {
@Override
......@@ -88,24 +87,24 @@ public class PersistenceTimerTest {
private MetricsPersistentWorker genWorkers(int num, int count) {
MetricsPersistentWorker persistenceWorker = mock(MetricsPersistentWorker.class);
doAnswer(invocation -> {
List argument = invocation.getArgument(0, List.class);
List<MockStorageData> results = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
results.add(new MockStorageData(num + " " + UUID.randomUUID()));
}
return Void.class;
}).when(persistenceWorker).buildBatchRequests(anyList());
return results;
}).when(persistenceWorker).buildBatchRequests();
return persistenceWorker;
}
private TopNWorker genTopNWorkers(int num, int count) {
TopNWorker persistenceWorker = mock(TopNWorker.class);
doAnswer(invocation -> {
List argument = invocation.getArgument(0, List.class);
List<MockStorageData> results = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
results.add(new MockStorageData(num + " " + UUID.randomUUID()));
}
return Void.class;
}).when(persistenceWorker).buildBatchRequests(anyList());
return results;
}).when(persistenceWorker).buildBatchRequests();
return persistenceWorker;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册