Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
import static com.gotocompany.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
import static org.apache.flink.table.api.Expressions.$;

/**
* The Stream manager.
*/
public class StreamManager {
public class DaggerSqlJobBuilder implements JobBuilder {

private final Configuration configuration;
private final StreamExecutionEnvironment executionEnvironment;
Expand All @@ -55,11 +52,11 @@ public class StreamManager {
private final DaggerContext daggerContext;

/**
* Instantiates a new Stream manager.
* Instantiates dagger sql job-builder.
*
* @param daggerContext the daggerContext in form of param
* @param daggerContext the daggerContext in form of param
*/
public StreamManager(DaggerContext daggerContext) {
public DaggerSqlJobBuilder(DaggerContext daggerContext) {
this.daggerContext = daggerContext;
this.configuration = daggerContext.getConfiguration();
this.executionEnvironment = daggerContext.getExecutionEnvironment();
Expand All @@ -71,7 +68,8 @@ public StreamManager(DaggerContext daggerContext) {
*
* @return the stream manager
*/
public StreamManager registerConfigs() {
@Override
public JobBuilder registerConfigs() {
stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);
Expand All @@ -96,7 +94,8 @@ public StreamManager registerConfigs() {
*
* @return the stream manager
*/
public StreamManager registerSourceWithPreProcessors() {
@Override
public JobBuilder registerSourceWithPreProcessors() {
long watermarkDelay = configuration.getLong(Constants.FLINK_WATERMARK_DELAY_MS_KEY, Constants.FLINK_WATERMARK_DELAY_MS_DEFAULT);
Boolean enablePerPartitionWatermark = configuration.getBoolean(Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT);
StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter)
Expand Down Expand Up @@ -144,7 +143,8 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
*
* @return the stream manager
*/
public StreamManager registerFunctions() throws IOException {
@Override
public JobBuilder registerFunctions() throws IOException {
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig, configuration);
Expand Down Expand Up @@ -178,7 +178,8 @@ private UdfFactory getUdfFactory(String udfFactoryClassName) throws ClassNotFoun
*
* @return the stream manager
*/
public StreamManager registerOutputStream() {
@Override
public JobBuilder registerOutputStream() {
Table table = tableEnvironment.sqlQuery(configuration.getString(Constants.FLINK_SQL_QUERY_KEY, Constants.FLINK_SQL_QUERY_DEFAULT));
StreamInfo streamInfo = createStreamInfo(table);
streamInfo = addPostProcessor(streamInfo);
Expand All @@ -191,6 +192,7 @@ public StreamManager registerOutputStream() {
*
* @throws Exception the exception
*/
@Override
public void execute() throws Exception {
executionEnvironment.execute(configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.gotocompany.dagger.core;

import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.common.core.DaggerContext;
import com.gotocompany.dagger.common.core.StencilClientOrchestrator;
import com.gotocompany.dagger.common.core.StreamInfo;
import com.gotocompany.dagger.common.watermark.LastColumnWatermark;
import com.gotocompany.dagger.common.watermark.StreamWatermarkAssigner;
import com.gotocompany.dagger.common.watermark.WatermarkStrategyDefinition;
import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import com.gotocompany.dagger.core.processors.PreProcessorFactory;
import com.gotocompany.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter;
import com.gotocompany.dagger.core.processors.types.Preprocessor;
import com.gotocompany.dagger.core.sink.SinkOrchestrator;
import com.gotocompany.dagger.core.source.StreamsFactory;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ExampleStreamApiJobBuilder implements JobBuilder {

// static final String KEY_PATH = "meta.customer.id";

private final String inputStreamName1 = "data_streams_0";
private final String inputStreamName2 = "data_streams_1";
private final Map<String, StreamInfo> dataStreams = new HashMap<>();

private final DaggerContext daggerContext;
private final Configuration configuration;
private final StreamExecutionEnvironment executionEnvironment;
private StencilClientOrchestrator stencilClientOrchestrator;
private DaggerStatsDReporter daggerStatsDReporter;
private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter();

public ExampleStreamApiJobBuilder(DaggerContext daggerContext) {
this.daggerContext = daggerContext;
this.configuration = daggerContext.getConfiguration();
this.executionEnvironment = daggerContext.getExecutionEnvironment();
}

@Override
public JobBuilder registerConfigs() {
stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);

executionEnvironment.setMaxParallelism(configuration.getInteger(Constants.FLINK_PARALLELISM_MAX_KEY, Constants.FLINK_PARALLELISM_MAX_DEFAULT));
executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
executionEnvironment.enableCheckpointing(configuration.getLong(Constants.FLINK_CHECKPOINT_INTERVAL_MS_KEY, Constants.FLINK_CHECKPOINT_INTERVAL_MS_DEFAULT));
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// goes on...
executionEnvironment.getConfig().setGlobalJobParameters(configuration.getParam());
return this;
}

@Override
public JobBuilder registerSourceWithPreProcessors() {
long watermarkDelay = configuration.getLong(Constants.FLINK_WATERMARK_DELAY_MS_KEY, Constants.FLINK_WATERMARK_DELAY_MS_DEFAULT);
Boolean enablePerPartitionWatermark = configuration.getBoolean(Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, Constants.FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT);

StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter)
.forEach(stream -> {
String tableName = stream.getStreamName();

WatermarkStrategyDefinition watermarkStrategyDefinition = new LastColumnWatermark();

DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());

DataStream<Row> dataStream1 = streamWatermarkAssigner
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);


// just some legacy objects to adopt preprocessors
TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(dataStream1, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName);

if (tableName.equals(inputStreamName1)) {
dataStreams.put(inputStreamName1, streamInfo);
}
if (tableName.equals(inputStreamName2)) {
dataStreams.put(inputStreamName2, streamInfo);
}
});
return this;
}

@Override
public JobBuilder registerFunctions() throws IOException {
return this;
}

@Override
public JobBuilder registerOutputStream() {
// NOTE - GET THE DATASTREAM REFERENCE
StreamInfo streamInfo = dataStreams.get(inputStreamName1);
Preconditions.checkNotNull(streamInfo, "Expected page log stream to be registered with name %s", inputStreamName1);

DataStream<Row> inputStream = streamInfo.getDataStream();

SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter);
sinkOrchestrator.addSubscriber(telemetryExporter);

SingleOutputStreamOperator<Row> outputStream =
inputStream

// NOTE - USE THE FLINK STREAM APIS HERE AND SINK THE OUTPUT

// .keyBy(
// new KeySelector<Row, Integer>() {
// private KeyExtractor keyExtractor;
//
// @Override
// public Integer getKey(Row row) {
// if (keyExtractor == null) {
// keyExtractor = new KeyExtractor(row, KEY_PATH);
// }
// int userId = keyExtractor.extract(row);
// return userId % DAU_PARALLELISM;
// }
// })
// .process(new ShardedDistinctUserCounter())
// .keyBy(r -> 0) // move all the output to one operator to calculate aggregation of all
// .process(new UserCounterAggregator());
.keyBy(r -> 0)
.max("someField");

outputStream.sinkTo(sinkOrchestrator.getSink(configuration, new String[]{"uniq_users"}, stencilClientOrchestrator, daggerStatsDReporter));
return this;
}

@Override
public void execute() throws Exception {
executionEnvironment.execute(configuration.getString(Constants.FLINK_JOB_ID_KEY, Constants.FLINK_JOB_ID_DEFAULT));
}

private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) {
List<Preprocessor> preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, tableName, telemetryExporter);
for (Preprocessor preprocessor : preProcessors) {
streamInfo = preprocessor.process(streamInfo);
}
return streamInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.gotocompany.dagger.core;

import java.io.IOException;

/**
* An interface derived from the publicly exposed methods of {@code DaggerSqlJobBuilder}
* previously referred as StreamManager.
* <p>
* The {@code KafkaProtoSQLProcessor}, which serves as the program entry point,
* initializes an instance for the given {@code JOB_BUILDER_FQCN} value.
* Ensure that the job builder class is bundled with the program during any
* subsequent build stages. If it is not, the system falls back to the
* {@code DEFAULT_JOB_BUILDER_FQCN} class, i.e., {@code com.gotocompany.dagger.core.DaggerSqlJobBuilder}.
* <p>
* Additionally, the job builder class is expected to provide a constructor
* that accepts a single parameter of type {@code DaggerContext}
*/
public interface JobBuilder {

JobBuilder registerConfigs();

JobBuilder registerSourceWithPreProcessors();

JobBuilder registerFunctions() throws IOException;

JobBuilder registerOutputStream();

void execute() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.core.config.ConfigurationProvider;
import com.gotocompany.dagger.core.config.ConfigurationProviderFactory;
import com.gotocompany.dagger.functions.common.Constants;
import org.apache.flink.client.program.ProgramInvocationException;
import com.gotocompany.dagger.common.core.DaggerContext;

import java.lang.reflect.Constructor;
import java.util.TimeZone;

import static com.gotocompany.dagger.functions.common.Constants.JOB_BUILDER_FQCN_KEY;

/**
* Main class to run Dagger.
*/
Expand All @@ -25,8 +29,9 @@ public static void main(String[] args) throws ProgramInvocationException {
Configuration configuration = provider.get();
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
DaggerContext daggerContext = DaggerContext.init(configuration);
StreamManager streamManager = new StreamManager(daggerContext);
streamManager

JobBuilder jobBuilder = getJobBuilderInstance(daggerContext);
jobBuilder
.registerConfigs()
.registerSourceWithPreProcessors()
.registerFunctions()
Expand All @@ -37,4 +42,18 @@ public static void main(String[] args) throws ProgramInvocationException {
throw new ProgramInvocationException(e);
}
}

private static JobBuilder getJobBuilderInstance(DaggerContext daggerContext) {
String className = daggerContext.getConfiguration().getString(JOB_BUILDER_FQCN_KEY, Constants.DEFAULT_JOB_BUILDER_FQCN);
try {
Class<?> builderClazz = Class.forName(className);
Constructor<?> builderClazzConstructor = builderClazz.getConstructor(DaggerContext.class);
return (JobBuilder) builderClazzConstructor.newInstance(daggerContext);
} catch (Exception e) {
Exception wrapperException = new Exception("Unable to instantiate job builder class: <" + className + "> \n"
+ "Instantiating default job builder com.gotocompany.dagger.core.DaggerSqlJobBuilder", e);
wrapperException.printStackTrace();
return new DaggerSqlJobBuilder(daggerContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@

@PrepareForTest(TableSchema.class)
@RunWith(PowerMockRunner.class)
public class StreamManagerTest extends DaggerContextTestBase {
public class DaggerSqlJobBuilderTest extends DaggerContextTestBase {

private StreamManager streamManager;
private DaggerSqlJobBuilder daggerSqlJobBuilder;

private String jsonArray = "[\n"
+ " {\n"
Expand Down Expand Up @@ -117,12 +117,12 @@ public void setup() {
when(schema.getFieldNames()).thenReturn(new String[0]);
PowerMockito.mockStatic(TableSchema.class);
when(TableSchema.fromTypeInfo(typeInformation)).thenReturn(schema);
streamManager = new StreamManager(daggerContext);
daggerSqlJobBuilder = new DaggerSqlJobBuilder(daggerContext);
}

@Test
public void shouldRegisterRequiredConfigsOnExecutionEnvironment() {
streamManager.registerConfigs();
daggerSqlJobBuilder.registerConfigs();

verify(streamExecutionEnvironment, Mockito.times(1)).setParallelism(1);
verify(streamExecutionEnvironment, Mockito.times(1)).enableCheckpointing(30000);
Expand All @@ -140,32 +140,32 @@ public void shouldRegisterSourceWithPreprocessorsWithWaterMarks() {
when(source.assignTimestampsAndWatermarks(any(WatermarkStrategy.class))).thenReturn(singleOutputStream);
when(singleOutputStream.getType()).thenReturn(typeInformation);

StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{}));
streamManagerStub.registerConfigs();
streamManagerStub.registerSourceWithPreProcessors();
DaggerSqlJobBuilderStub daggerSqlJobBuilderStub = new DaggerSqlJobBuilderStub(daggerContext, new StreamInfo(dataStream, new String[]{}));
daggerSqlJobBuilderStub.registerConfigs();
daggerSqlJobBuilderStub.registerSourceWithPreProcessors();

verify(streamTableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{});
}

@Test
public void shouldCreateOutputStream() {
StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{}));
streamManagerStub.registerOutputStream();
DaggerSqlJobBuilderStub daggerSqlJobBuilderStub = new DaggerSqlJobBuilderStub(daggerContext, new StreamInfo(dataStream, new String[]{}));
daggerSqlJobBuilderStub.registerOutputStream();
verify(streamTableEnvironment, Mockito.times(1)).sqlQuery("");
}

@Test
public void shouldExecuteJob() throws Exception {
streamManager.execute();
daggerSqlJobBuilder.execute();

verify(streamExecutionEnvironment, Mockito.times(1)).execute("SQL Flink job");
}

final class StreamManagerStub extends StreamManager {
final class DaggerSqlJobBuilderStub extends DaggerSqlJobBuilder {

private final StreamInfo streamInfo;

private StreamManagerStub(DaggerContext daggerContext, StreamInfo streamInfo) {
private DaggerSqlJobBuilderStub(DaggerContext daggerContext, StreamInfo streamInfo) {
super(daggerContext);
this.streamInfo = streamInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ public class Constants {
public static final String COS_REGION = "COS_REGION";
public static final String DEFAULT_COS_REGION = "ap-jakarta";
public static final String ENABLE_TKE_OIDC_PROVIDER = "ENABLE_TKE_OIDC_PROVIDER";

public static final String JOB_BUILDER_FQCN_KEY = "JOB_BUILDER_FQCN";
public static final String DEFAULT_JOB_BUILDER_FQCN = "com.gotocompany.dagger.core.DaggerSqlJobBuilder";
}
2 changes: 1 addition & 1 deletion docs/docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ files as provided are consumed in a single stream.

_**Dagger Core**_

- The core part of the dagger(StreamManager) has the following responsibilities. It works sort of as a controller for other components in the dagger.
- The core part of the dagger(DaggerSqlJobBuilder) has the following responsibilities. It works sort of as a controller for other components in the dagger.
- Configuration management.
- Table registration.
- Configuring Deserialization and Serialization of data.
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.9
0.12.0