diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index ec032a61653f0..31a6019a50a6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -99,6 +99,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setProcessingTimeService; @@ -187,6 +188,9 @@ public InternalTimeServiceManager create( private long restoredCheckpointId = 0; + private Function, Output>> outputCreator = + MockOutput::new; + public AbstractStreamOperatorTestHarness( StreamOperator operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -364,6 +368,11 @@ public void setStateBackend(StateBackend stateBackend) { } } + public void setOutputCreator( + Function, Output>> outputCreator) { + this.outputCreator = outputCreator; + } + public void setCheckpointStorage(CheckpointStorage storage) { if (stateBackend instanceof CheckpointStorage) { return; @@ -469,7 +478,7 @@ public void setup(TypeSerializer outputSerializer) { factory, mockTask, config, - new MockOutput(outputSerializer), + outputCreator.apply(outputSerializer), new OperatorEventDispatcherImpl( this.getClass().getClassLoader(), new NoOpTaskOperatorEventGateway())) @@ -482,7 +491,7 @@ public void setup(TypeSerializer outputSerializer) { (AbstractStreamOperator) operator, mockTask, config, - new MockOutput(outputSerializer)); + outputCreator.apply(outputSerializer)); } } setupCalled = true;