diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/StreamManager.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java similarity index 95% rename from dagger-core/src/main/java/com/gotocompany/dagger/core/StreamManager.java rename to dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index 948898b7d..12c966be7 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -40,10 +40,7 @@ import static com.gotocompany.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY; import static org.apache.flink.table.api.Expressions.$; -/** - * The Stream manager. - */ -public class StreamManager { +public class DaggerSqlJobBuilder implements JobBuilder { private final Configuration configuration; private final StreamExecutionEnvironment executionEnvironment; @@ -55,11 +52,11 @@ public class StreamManager { private final DaggerContext daggerContext; /** - * Instantiates a new Stream manager. + * Instantiates dagger sql job-builder. * - * @param daggerContext the daggerContext in form of param + * @param daggerContext the daggerContext in form of param */ - public StreamManager(DaggerContext daggerContext) { + public DaggerSqlJobBuilder(DaggerContext daggerContext) { this.daggerContext = daggerContext; this.configuration = daggerContext.getConfiguration(); this.executionEnvironment = daggerContext.getExecutionEnvironment(); @@ -71,7 +68,8 @@ public StreamManager(DaggerContext daggerContext) { * * @return the stream manager */ - public StreamManager registerConfigs() { + @Override + public JobBuilder registerConfigs() { stencilClientOrchestrator = new StencilClientOrchestrator(configuration); org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration(); daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration); @@ -96,7 +94,8 @@ public StreamManager registerConfigs() { * * @return the stream manager */ - public StreamManager registerSourceWithPreProcessors() { + @Override + public JobBuilder registerSourceWithPreProcessors() { long watermarkDelay = configuration.getLong(Constants.FLINK_WATERMARK_DELAY_MS_KEY, Constants.FLINK_WATERMARK_DELAY_MS_DEFAULT); Boolean enablePerPartitionWatermark = configuration.getBoolean(Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT); StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter) @@ -144,7 +143,8 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) { * * @return the stream manager */ - public StreamManager registerFunctions() throws IOException { + @Override + public JobBuilder registerFunctions() throws IOException { if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) { PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig, configuration); @@ -178,7 +178,8 @@ private UdfFactory getUdfFactory(String udfFactoryClassName) throws ClassNotFoun * * @return the stream manager */ - public StreamManager registerOutputStream() { + @Override + public JobBuilder registerOutputStream() { Table table = tableEnvironment.sqlQuery(configuration.getString(Constants.FLINK_SQL_QUERY_KEY, Constants.FLINK_SQL_QUERY_DEFAULT)); StreamInfo streamInfo = createStreamInfo(table); streamInfo = addPostProcessor(streamInfo); @@ -191,6 +192,7 @@ public StreamManager registerOutputStream() { * * @throws Exception the exception */ + @Override public void execute() throws Exception { executionEnvironment.execute(configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT)); } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/ExampleStreamApiJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/ExampleStreamApiJobBuilder.java new file mode 100644 index 000000000..811843602 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/ExampleStreamApiJobBuilder.java @@ -0,0 +1,156 @@ +package com.gotocompany.dagger.core; + +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.common.core.DaggerContext; +import com.gotocompany.dagger.common.core.StencilClientOrchestrator; +import com.gotocompany.dagger.common.core.StreamInfo; +import com.gotocompany.dagger.common.watermark.LastColumnWatermark; +import com.gotocompany.dagger.common.watermark.StreamWatermarkAssigner; +import com.gotocompany.dagger.common.watermark.WatermarkStrategyDefinition; +import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; +import com.gotocompany.dagger.core.processors.PreProcessorFactory; +import com.gotocompany.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; +import com.gotocompany.dagger.core.processors.types.Preprocessor; +import com.gotocompany.dagger.core.sink.SinkOrchestrator; +import com.gotocompany.dagger.core.source.StreamsFactory; +import com.gotocompany.dagger.core.utils.Constants; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ExampleStreamApiJobBuilder implements JobBuilder { + +// static final String KEY_PATH = "meta.customer.id"; + + private final String inputStreamName1 = "data_streams_0"; + private final String inputStreamName2 = "data_streams_1"; + private final Map dataStreams = new HashMap<>(); + + private final DaggerContext daggerContext; + private final Configuration configuration; + private final StreamExecutionEnvironment executionEnvironment; + private StencilClientOrchestrator stencilClientOrchestrator; + private DaggerStatsDReporter daggerStatsDReporter; + private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter(); + + public ExampleStreamApiJobBuilder(DaggerContext daggerContext) { + this.daggerContext = daggerContext; + this.configuration = daggerContext.getConfiguration(); + this.executionEnvironment = daggerContext.getExecutionEnvironment(); + } + + @Override + public JobBuilder registerConfigs() { + stencilClientOrchestrator = new StencilClientOrchestrator(configuration); + org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration(); + daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration); + + executionEnvironment.setMaxParallelism(configuration.getInteger(Constants.FLINK_PARALLELISM_MAX_KEY, Constants.FLINK_PARALLELISM_MAX_DEFAULT)); + executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); + executionEnvironment.enableCheckpointing(configuration.getLong(Constants.FLINK_CHECKPOINT_INTERVAL_MS_KEY, Constants.FLINK_CHECKPOINT_INTERVAL_MS_DEFAULT)); + executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + + // goes on... + executionEnvironment.getConfig().setGlobalJobParameters(configuration.getParam()); + return this; + } + + @Override + public JobBuilder registerSourceWithPreProcessors() { + long watermarkDelay = configuration.getLong(Constants.FLINK_WATERMARK_DELAY_MS_KEY, Constants.FLINK_WATERMARK_DELAY_MS_DEFAULT); + Boolean enablePerPartitionWatermark = configuration.getBoolean(Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT); + + StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter) + .forEach(stream -> { + String tableName = stream.getStreamName(); + + WatermarkStrategyDefinition watermarkStrategyDefinition = new LastColumnWatermark(); + + DataStream dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay)); + StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark()); + + DataStream dataStream1 = streamWatermarkAssigner + .assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark); + + + // just some legacy objects to adopt preprocessors + TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType()); + StreamInfo streamInfo = new StreamInfo(dataStream1, tableSchema.getFieldNames()); + streamInfo = addPreProcessor(streamInfo, tableName); + + if (tableName.equals(inputStreamName1)) { + dataStreams.put(inputStreamName1, streamInfo); + } + if (tableName.equals(inputStreamName2)) { + dataStreams.put(inputStreamName2, streamInfo); + } + }); + return this; + } + + @Override + public JobBuilder registerFunctions() throws IOException { + return this; + } + + @Override + public JobBuilder registerOutputStream() { + // NOTE - GET THE DATASTREAM REFERENCE + StreamInfo streamInfo = dataStreams.get(inputStreamName1); + Preconditions.checkNotNull(streamInfo, "Expected page log stream to be registered with name %s", inputStreamName1); + + DataStream inputStream = streamInfo.getDataStream(); + + SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter); + sinkOrchestrator.addSubscriber(telemetryExporter); + + SingleOutputStreamOperator outputStream = + inputStream + + // NOTE - USE THE FLINK STREAM APIS HERE AND SINK THE OUTPUT + +// .keyBy( +// new KeySelector() { +// private KeyExtractor keyExtractor; +// +// @Override +// public Integer getKey(Row row) { +// if (keyExtractor == null) { +// keyExtractor = new KeyExtractor(row, KEY_PATH); +// } +// int userId = keyExtractor.extract(row); +// return userId % DAU_PARALLELISM; +// } +// }) +// .process(new ShardedDistinctUserCounter()) +// .keyBy(r -> 0) // move all the output to one operator to calculate aggregation of all +// .process(new UserCounterAggregator()); + .keyBy(r -> 0) + .max("someField"); + + outputStream.sinkTo(sinkOrchestrator.getSink(configuration, new String[]{"uniq_users"}, stencilClientOrchestrator, daggerStatsDReporter)); + return this; + } + + @Override + public void execute() throws Exception { + executionEnvironment.execute(configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT)); + } + + private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) { + List preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, tableName, telemetryExporter); + for (Preprocessor preprocessor : preProcessors) { + streamInfo = preprocessor.process(streamInfo); + } + return streamInfo; + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/JobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/JobBuilder.java new file mode 100644 index 000000000..a2aa07a08 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/JobBuilder.java @@ -0,0 +1,29 @@ +package com.gotocompany.dagger.core; + +import java.io.IOException; + +/** + * An interface derived from the publicly exposed methods of {@code DaggerSqlJobBuilder} + * previously referred as StreamManager. + *

+ * The {@code KafkaProtoSQLProcessor}, which serves as the program entry point, + * initializes an instance for the given {@code JOB_BUILDER_FQCN} value. + * Ensure that the job builder class is bundled with the program during any + * subsequent build stages. If it is not, the system falls back to the + * {@code DEFAULT_JOB_BUILDER_FQCN} class, i.e., {@code com.gotocompany.dagger.core.DaggerSqlJobBuilder}. + *

+ * Additionally, the job builder class is expected to provide a constructor + * that accepts a single parameter of type {@code DaggerContext} + */ +public interface JobBuilder { + + JobBuilder registerConfigs(); + + JobBuilder registerSourceWithPreProcessors(); + + JobBuilder registerFunctions() throws IOException; + + JobBuilder registerOutputStream(); + + void execute() throws Exception; +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/KafkaProtoSQLProcessor.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/KafkaProtoSQLProcessor.java index c2c012d4f..19798d03d 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/KafkaProtoSQLProcessor.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/KafkaProtoSQLProcessor.java @@ -3,11 +3,15 @@ import com.gotocompany.dagger.common.configuration.Configuration; import com.gotocompany.dagger.core.config.ConfigurationProvider; import com.gotocompany.dagger.core.config.ConfigurationProviderFactory; +import com.gotocompany.dagger.functions.common.Constants; import org.apache.flink.client.program.ProgramInvocationException; import com.gotocompany.dagger.common.core.DaggerContext; +import java.lang.reflect.Constructor; import java.util.TimeZone; +import static com.gotocompany.dagger.functions.common.Constants.JOB_BUILDER_FQCN_KEY; + /** * Main class to run Dagger. */ @@ -25,8 +29,9 @@ public static void main(String[] args) throws ProgramInvocationException { Configuration configuration = provider.get(); TimeZone.setDefault(TimeZone.getTimeZone("UTC")); DaggerContext daggerContext = DaggerContext.init(configuration); - StreamManager streamManager = new StreamManager(daggerContext); - streamManager + + JobBuilder jobBuilder = getJobBuilderInstance(daggerContext); + jobBuilder .registerConfigs() .registerSourceWithPreProcessors() .registerFunctions() @@ -37,4 +42,18 @@ public static void main(String[] args) throws ProgramInvocationException { throw new ProgramInvocationException(e); } } + + private static JobBuilder getJobBuilderInstance(DaggerContext daggerContext) { + String className = daggerContext.getConfiguration().getString(JOB_BUILDER_FQCN_KEY, Constants.DEFAULT_JOB_BUILDER_FQCN); + try { + Class builderClazz = Class.forName(className); + Constructor builderClazzConstructor = builderClazz.getConstructor(DaggerContext.class); + return (JobBuilder) builderClazzConstructor.newInstance(daggerContext); + } catch (Exception e) { + Exception wrapperException = new Exception("Unable to instantiate job builder class: <" + className + "> \n" + + "Instantiating default job builder com.gotocompany.dagger.core.DaggerSqlJobBuilder", e); + wrapperException.printStackTrace(); + return new DaggerSqlJobBuilder(daggerContext); + } + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/StreamManagerTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/DaggerSqlJobBuilderTest.java similarity index 89% rename from dagger-core/src/test/java/com/gotocompany/dagger/core/StreamManagerTest.java rename to dagger-core/src/test/java/com/gotocompany/dagger/core/DaggerSqlJobBuilderTest.java index e63154dee..5f9a748ea 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/StreamManagerTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/DaggerSqlJobBuilderTest.java @@ -38,9 +38,9 @@ @PrepareForTest(TableSchema.class) @RunWith(PowerMockRunner.class) -public class StreamManagerTest extends DaggerContextTestBase { +public class DaggerSqlJobBuilderTest extends DaggerContextTestBase { - private StreamManager streamManager; + private DaggerSqlJobBuilder daggerSqlJobBuilder; private String jsonArray = "[\n" + " {\n" @@ -117,12 +117,12 @@ public void setup() { when(schema.getFieldNames()).thenReturn(new String[0]); PowerMockito.mockStatic(TableSchema.class); when(TableSchema.fromTypeInfo(typeInformation)).thenReturn(schema); - streamManager = new StreamManager(daggerContext); + daggerSqlJobBuilder = new DaggerSqlJobBuilder(daggerContext); } @Test public void shouldRegisterRequiredConfigsOnExecutionEnvironment() { - streamManager.registerConfigs(); + daggerSqlJobBuilder.registerConfigs(); verify(streamExecutionEnvironment, Mockito.times(1)).setParallelism(1); verify(streamExecutionEnvironment, Mockito.times(1)).enableCheckpointing(30000); @@ -140,32 +140,32 @@ public void shouldRegisterSourceWithPreprocessorsWithWaterMarks() { when(source.assignTimestampsAndWatermarks(any(WatermarkStrategy.class))).thenReturn(singleOutputStream); when(singleOutputStream.getType()).thenReturn(typeInformation); - StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); - streamManagerStub.registerConfigs(); - streamManagerStub.registerSourceWithPreProcessors(); + DaggerSqlJobBuilderStub daggerSqlJobBuilderStub = new DaggerSqlJobBuilderStub(daggerContext, new StreamInfo(dataStream, new String[]{})); + daggerSqlJobBuilderStub.registerConfigs(); + daggerSqlJobBuilderStub.registerSourceWithPreProcessors(); verify(streamTableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{}); } @Test public void shouldCreateOutputStream() { - StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); - streamManagerStub.registerOutputStream(); + DaggerSqlJobBuilderStub daggerSqlJobBuilderStub = new DaggerSqlJobBuilderStub(daggerContext, new StreamInfo(dataStream, new String[]{})); + daggerSqlJobBuilderStub.registerOutputStream(); verify(streamTableEnvironment, Mockito.times(1)).sqlQuery(""); } @Test public void shouldExecuteJob() throws Exception { - streamManager.execute(); + daggerSqlJobBuilder.execute(); verify(streamExecutionEnvironment, Mockito.times(1)).execute("SQL Flink job"); } - final class StreamManagerStub extends StreamManager { + final class DaggerSqlJobBuilderStub extends DaggerSqlJobBuilder { private final StreamInfo streamInfo; - private StreamManagerStub(DaggerContext daggerContext, StreamInfo streamInfo) { + private DaggerSqlJobBuilderStub(DaggerContext daggerContext, StreamInfo streamInfo) { super(daggerContext); this.streamInfo = streamInfo; } diff --git a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java index f71865d96..0befc0b42 100644 --- a/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java +++ b/dagger-functions/src/main/java/com/gotocompany/dagger/functions/common/Constants.java @@ -42,4 +42,7 @@ public class Constants { public static final String COS_REGION = "COS_REGION"; public static final String DEFAULT_COS_REGION = "ap-jakarta"; public static final String ENABLE_TKE_OIDC_PROVIDER = "ENABLE_TKE_OIDC_PROVIDER"; + + public static final String JOB_BUILDER_FQCN_KEY = "JOB_BUILDER_FQCN"; + public static final String DEFAULT_JOB_BUILDER_FQCN = "com.gotocompany.dagger.core.DaggerSqlJobBuilder"; } diff --git a/docs/docs/concepts/architecture.md b/docs/docs/concepts/architecture.md index f32e1ea77..73ca685e4 100644 --- a/docs/docs/concepts/architecture.md +++ b/docs/docs/concepts/architecture.md @@ -20,7 +20,7 @@ files as provided are consumed in a single stream. _**Dagger Core**_ -- The core part of the dagger(StreamManager) has the following responsibilities. It works sort of as a controller for other components in the dagger. +- The core part of the dagger(DaggerSqlJobBuilder) has the following responsibilities. It works sort of as a controller for other components in the dagger. - Configuration management. - Table registration. - Configuring Deserialization and Serialization of data. diff --git a/version.txt b/version.txt index 24b197ac9..ac454c6a1 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.11.9 +0.12.0