未验证 提交 fc49801d 编写于 作者: K klion26 提交者: Till Rohrmann

[FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase

This closes #6305.
上级 37abf46f
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for asynchronous file backend.
*/
public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.FILE_ASYNC;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for asynchronous memory backend.
*/
public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.MEM_ASYNC;
}
}
......@@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
*
* <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
* <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
......
......@@ -58,15 +58,25 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -82,7 +92,8 @@ import static org.junit.Assert.fail;
* I/O heavy variants.
*/
@SuppressWarnings("serial")
public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger {
@RunWith(Parameterized.class)
public class EventTimeWindowCheckpointingITCase extends TestLogger {
private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
private static final int PARALLELISM = 4;
......@@ -99,11 +110,21 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
private AbstractStateBackend stateBackend;
@Parameterized.Parameter
public StateBackendEnum stateBackendEnum;
enum StateBackendEnum {
MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
}
protected abstract StateBackendEnum getStateBackend();
@Parameterized.Parameters(name = "statebackend type ={0}")
public static Collection<StateBackendEnum> parameter() {
return Arrays.asList(StateBackendEnum.values());
}
protected StateBackendEnum getStateBackend() {
return this.stateBackendEnum;
}
protected final MiniClusterResource getMiniClusterResource() {
return new MiniClusterResource(
......@@ -871,19 +892,40 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
}
protected int numElementsPerKey() {
return 300;
private int numElementsPerKey() {
switch (this.stateBackendEnum) {
case ROCKSDB_FULLY_ASYNC:
case ROCKSDB_INCREMENTAL:
case ROCKSDB_INCREMENTAL_ZK:
return 3000;
default:
return 300;
}
}
protected int windowSize() {
return 100;
private int windowSize() {
switch (this.stateBackendEnum) {
case ROCKSDB_FULLY_ASYNC:
case ROCKSDB_INCREMENTAL:
case ROCKSDB_INCREMENTAL_ZK:
return 1000;
default:
return 100;
}
}
protected int windowSlide() {
private int windowSlide() {
return 100;
}
protected int numKeys() {
return 20;
private int numKeys() {
switch (this.stateBackendEnum) {
case ROCKSDB_FULLY_ASYNC:
case ROCKSDB_INCREMENTAL:
case ROCKSDB_INCREMENTAL_ZK:
return 100;
default:
return 20;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for file backend.
*/
public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.FILE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for incremental RocksDB backend.
*/
public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
}
@Override
protected int numElementsPerKey() {
return 3000;
}
@Override
protected int windowSize() {
return 1000;
}
@Override
protected int windowSlide() {
return 100;
}
@Override
protected int numKeys() {
return 100;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for incremental RocksDB backend.
*/
public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.ROCKSDB_INCREMENTAL;
}
@Override
protected int numElementsPerKey() {
return 3000;
}
@Override
protected int windowSize() {
return 1000;
}
@Override
protected int windowSlide() {
return 100;
}
@Override
protected int numKeys() {
return 100;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
/**
* Tests file-based local recovery with the HeapBackend.
*/
public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryHeapITCase() {
super(FILE_ASYNC, true);
}
}
......@@ -25,35 +25,46 @@ import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
/**
* This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
* This test delegates to instances of {@link EventTimeWindowCheckpointingITCase} that have been reconfigured
* to use local recovery.
*
* <p>TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}.
* <p>TODO: This class must be refactored to properly extend {@link EventTimeWindowCheckpointingITCase}.
*/
public abstract class AbstractLocalRecoveryITCase extends TestLogger {
@RunWith(Parameterized.class)
public class LocalRecoveryITCase extends TestLogger {
private final StateBackendEnum backendEnum;
private final boolean localRecoveryEnabled;
private final boolean localRecoveryEnabled = true;
@Rule
public TestName testName = new TestName();
AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
this.backendEnum = backendEnum;
this.localRecoveryEnabled = localRecoveryEnabled;
@Parameterized.Parameter
public StateBackendEnum backendEnum;
@Parameterized.Parameters(name = "statebackend type ={0}")
public static Collection<StateBackendEnum> parameter() {
return Arrays.asList(ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL_ZK, FILE_ASYNC);
}
@Test
public final void executeTest() throws Exception {
AbstractEventTimeWindowCheckpointingITCase.tempFolder.create();
AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
new AbstractEventTimeWindowCheckpointingITCase() {
EventTimeWindowCheckpointingITCase.tempFolder.create();
EventTimeWindowCheckpointingITCase windowChkITCase =
new EventTimeWindowCheckpointingITCase() {
@Override
protected StateBackendEnum getStateBackend() {
return backendEnum;
......@@ -74,7 +85,7 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger {
executeTest(windowChkITCase);
}
private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception {
private void executeTest(EventTimeWindowCheckpointingITCase delegate) throws Exception {
delegate.name = testName;
try {
delegate.setupTestCluster();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
/**
* Tests file-based local recovery with the RocksDB state-backend.
*/
public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryRocksDBFullITCase() {
super(ROCKSDB_FULLY_ASYNC, true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
/**
* Tests file-based local recovery with the RocksDB state-backend and incremental checkpointing enabled.
*/
public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryRocksDBIncrementalITCase() {
super(ROCKSDB_INCREMENTAL_ZK, true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for memory backend.
*/
public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.MEM;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.checkpointing;
/**
* Integration tests for fully synchronous RocksDB backend.
*/
public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
@Override
protected StateBackendEnum getStateBackend() {
return StateBackendEnum.ROCKSDB_FULLY_ASYNC;
}
@Override
protected int numElementsPerKey() {
return 3000;
}
@Override
protected int windowSize() {
return 1000;
}
@Override
protected int windowSlide() {
return 100;
}
@Override
protected int numKeys() {
return 100;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册