diff --git a/CHANGELOG.md b/CHANGELOG.md index fc408f2..fdfc613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html) +## [0.1.79] - 2024-04-25 +Implemented a new functionality to send email notifications regarding the initiation of EMR cluster spin-up during the start of datapull jobs, covering both success and failure scenarios. +### Changed +- api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java +- api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java +- api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java +### Added +- api/src/main/java/com/homeaway/datapullclient/config/SESProperties.java +- api/src/main/java/com/homeaway/datapullclient/config/SMTPProperties.java +- api/src/main/java/com/homeaway/datapullclient/utils/EmailNotification.java + +## [0.1.78] - 2024-04-22 +Mandating the Linux flavor of the docker image that's been used in ECR. +### Changed +- API/terraform/datapull_task/ecs_deploy.sh + +## [0.1.77] - 2024-04-10 +Fixing the null subnetId issue by adding StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet3()) +### Changed +- api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java ## [0.1.76] - 2024-01-08 Fixing the issue when the key is not provided by the user. and reverting chnages w.r.t insertinto hive table for partitioned table. diff --git a/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java b/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java index 46cc44e..67e31d3 100644 --- a/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java +++ b/api/src/main/java/com/homeaway/datapullclient/config/DataPullClientConfig.java @@ -24,10 +24,12 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.homeaway.datapullclient.process.DataPullTask; +import com.homeaway.datapullclient.utils.EmailNotification; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.*; import org.springframework.core.env.Environment; @@ -35,6 +37,10 @@ @Data @Configuration("dataPullConfig") @PropertySources(@PropertySource("classpath:application.yml")) +// +//@SpringBootApplication +//@ComponentScan("com.homeaway.datapullclient") + public class DataPullClientConfig { @Autowired @@ -71,6 +77,18 @@ public EMRProperties getEmrProperties(){ return new EMRProperties(); } + + @Bean + public SMTPProperties getSMTPProperties(){ + return new SMTPProperties(); + } + + + @Bean + public SESProperties getSESProperties(){ + return new SESProperties(); + } + @Bean public AmazonElasticMapReduce getEMRClient(){ AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); @@ -79,4 +97,5 @@ public AmazonElasticMapReduce getEMRClient(){ .withCredentials(credentialsProvider) .build()); } + } \ No newline at end of file diff --git a/api/src/main/java/com/homeaway/datapullclient/config/SESProperties.java b/api/src/main/java/com/homeaway/datapullclient/config/SESProperties.java new file mode 100644 index 0000000..13efe03 --- /dev/null +++ b/api/src/main/java/com/homeaway/datapullclient/config/SESProperties.java @@ -0,0 +1,29 @@ + +package com.homeaway.datapullclient.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix="datapull.email.ses") +@EnableConfigurationProperties +@Data + +public class SESProperties { + + @Value("${region:}") + private String region; + + @Value("${email:}") + private String email; + + @Value("${access_key:}") + private String accessKey; + + @Value("${secret_key:}") + private String secretKey; + +} \ No newline at end of file diff --git a/api/src/main/java/com/homeaway/datapullclient/config/SMTPProperties.java b/api/src/main/java/com/homeaway/datapullclient/config/SMTPProperties.java new file mode 100644 index 0000000..14b7c60 --- /dev/null +++ b/api/src/main/java/com/homeaway/datapullclient/config/SMTPProperties.java @@ -0,0 +1,29 @@ +package com.homeaway.datapullclient.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix="datapull.email.smtp") +@EnableConfigurationProperties +@Data + +public class SMTPProperties { + + @Value("${emailaddress:}") + private String emailaddress; + + @Value("${smtpserveraddress:}") + private String smtpserveraddress; + + @Value("${port:}") + private String smtpport; + + @Value("${starttls:}") + private String smtpstarttls; + + +} diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java index 1c290d0..bbda877 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullRequestProcessor.java @@ -34,6 +34,7 @@ import com.homeaway.datapullclient.input.Source; import com.homeaway.datapullclient.service.DataPullClientService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.everit.json.schema.Schema; import org.everit.json.schema.ValidationException; import org.everit.json.schema.loader.SchemaLoader; @@ -84,6 +85,9 @@ public class DataPullRequestProcessor implements DataPullClientService { private final ThreadPoolTaskScheduler scheduler; + HashMap successEmails = new HashMap<>(); + + HashMap failureEmails = new HashMap<>(); public DataPullRequestProcessor(){ scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(POOL_SIZE); @@ -123,6 +127,10 @@ public void runSimpleDataPull(String awsenv, String pipelinename) { } private void runDataPull(String json, boolean isStart, boolean validateJson) throws ProcessingException { + + String userEmail; + String failureEmail; + String originalInputJson = json; json = extractUserJsonFromS3IfProvided(json, isStart); @@ -138,6 +146,26 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr log.info("Running datapull for json : " + json + " cron expression = " + isStart + "env =" + env); final ObjectNode node = new ObjectMapper().readValue(json, ObjectNode.class); + + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode jsonNode = objectMapper.readTree(json); + userEmail = null != jsonNode.get("useremailaddress") ? jsonNode.get("useremailaddress").asText(): ""; + + String taskId = jsonNode.get("cluster").get("awsenv").asText().concat("-emr-").concat(jsonNode.get("cluster").get("pipelinename").asText()).concat("-pipeline"); + successEmails.put(taskId,userEmail); + + JsonNode failureEmailNode = jsonNode.get("failureemailaddress"); + if(StringUtils.isNotEmpty(userEmail)){ + failureEmail = (failureEmailNode != null) ? userEmail.concat(",").concat(failureEmailNode.asText()): userEmail; + + }else { + failureEmail = (failureEmailNode != null) ? (failureEmailNode.asText()): ""; + + } + failureEmails.put(taskId,failureEmail); + + List> result = new LinkedList>(); Iterator> nodes = node.fields(); while(nodes.hasNext()){ @@ -197,6 +225,16 @@ private void runDataPull(String json, boolean isStart, boolean validateJson) thr log.debug("runDataPull <- return"); } + public HashMap successMailAddress() throws ProcessingException { + return successEmails; + } + + + public HashMap failureMailAddress() throws ProcessingException { + return failureEmails; + } + + private StringBuilder createBootstrapString(Object[] paths, String bootstrapActionStringFromUser) throws ProcessingException { StringBuilder stringBuilder = new StringBuilder(); diff --git a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java index 18774d6..2e4a0fb 100644 --- a/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java +++ b/api/src/main/java/com/homeaway/datapullclient/process/DataPullTask.java @@ -21,19 +21,32 @@ import com.homeaway.datapullclient.config.DataPullClientConfig; import com.homeaway.datapullclient.config.DataPullProperties; import com.homeaway.datapullclient.config.EMRProperties; +import com.homeaway.datapullclient.config.SMTPProperties; +import com.homeaway.datapullclient.exception.ProcessingException; import com.homeaway.datapullclient.input.ClusterProperties; +import com.homeaway.datapullclient.utils.EmailNotification; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import javax.mail.internet.AddressException; +import javax.mail.internet.InternetAddress; +import java.io.PrintWriter; +import java.io.StringWriter; import java.sql.Array; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; import java.util.stream.Collectors; @Slf4j public class DataPullTask implements Runnable { + + + @Autowired + DataPullRequestProcessor dataPullRequestProcessor; + //private Logger log = LoggerManag; private static final String MAIN_CLASS = "core.DataPull"; @@ -50,11 +63,37 @@ public class DataPullTask implements Runnable { @Autowired private DataPullClientConfig config; + private SMTPProperties smtpProperties; + private static final String JSON_WITH_INPUT_FILE_PATH = "{\r\n \"jsoninputfile\": {\r\n \"s3path\": \"%s\"\r\n }\r\n}"; private final Map emrTags = new HashMap<>(); private ClusterProperties clusterProperties; private Boolean haveBootstrapAction; + private static final Logger logger = Logger.getLogger(DataPullTask.class.getName()); + private Exception storedException; + String stackTrace = null; + + String jobFlowId = null; + + String ClusterId = null; + + enum emailStatusCode { + CLUSTER_CREATION_SUCCESS, + CLUSTER_CREATION_FAILED, + SPARK_EXEC_FAILED, + SPARK_EXEC_ON_EXISTING_CLUSTER, + INVALID_PARAMS, + RUN_ON_EXISTING_CLUSTER_FAILED + }; + + Boolean runSparkClusterFail,InvalidParamList,getJobFlowInstancesConfigFail,runTaskOnExistingClusterFail; + + private ProcessingException processingException; + + + EmailNotification emailNotification ; + public DataPullTask(final String taskId, final String s3File, final String jksFilePath) { s3FilePath = s3File; this.taskId = taskId; @@ -77,20 +116,25 @@ public static List toList(final String[] array) { @Override public void run() { - this.runSparkCluster(); + try { + this.runSparkCluster(); + } catch (ProcessingException e) { + throw new RuntimeException(e); + } } - private void runSparkCluster() { - - DataPullTask.log.info("Started cluster config taskId = " + this.taskId); - final AmazonElasticMapReduce emr = this.config.getEMRClient(); - final int MAX_RETRY = 16; - final DataPullProperties dataPullProperties = this.config.getDataPullProperties(); - final String logFilePath = dataPullProperties.getLogFilePath(); - final String s3RepositoryBucketName = dataPullProperties.getS3BucketName(); - final String logPath = logFilePath == null || logFilePath.equals("") ? - "s3://" + s3RepositoryBucketName + "/" + "datapull-opensource/logs/SparkLogs" : logFilePath; + private void runSparkCluster() throws ProcessingException { + try{ + DataPullTask.log.info("Started cluster config taskId = " + this.taskId); + final AmazonElasticMapReduce emr = this.config.getEMRClient(); + final int MAX_RETRY = 16; + final DataPullProperties dataPullProperties = this.config.getDataPullProperties(); + emailNotification = new EmailNotification(config); + final String logFilePath = dataPullProperties.getLogFilePath(); + final String s3RepositoryBucketName = dataPullProperties.getS3BucketName(); + final String logPath = logFilePath == null || logFilePath.equals("") ? + "s3://" + s3RepositoryBucketName + "/" + "datapull-opensource/logs/SparkLogs" : logFilePath; s3JarPath = s3JarPath == null || s3JarPath.equals("") ? "s3://" + s3RepositoryBucketName + "/" + "datapull-opensource/jars/DataMigrationFramework-1.0-SNAPSHOT-jar-with-dependencies.jar" : s3JarPath; @@ -172,26 +216,41 @@ private void runSparkCluster() { DataPullTask.log.info("Task " + this.taskId + " submitted to EMR cluster"); } - } else { - final RunJobFlowResult result = this.runTaskInNewCluster(emr, logPath, this.s3JarPath, Objects.toString(this.clusterProperties.getSparksubmitparams(), ""), haveBootstrapAction); - DataPullTask.log.info(result.toString()); - } - - DataPullTask.log.info("Task " + this.taskId + " submitted to EMR cluster"); - - if (!reapClusters.isEmpty()) { - reapClusters.forEach(cluster -> { - String clusterIdToReap = cluster.getId(); - String clusterNameToReap = cluster.getName(); - //ensure we don't reap the cluster we just used - if (!clusters.isEmpty() && clusters.get(0).getId().equals(clusterIdToReap)) { - DataPullTask.log.info("Cannot reap in-use cluster " + clusterNameToReap + " with Id " + clusterIdToReap); - } else { - DataPullTask.log.info("About to reap cluster " + clusterNameToReap + " with Id " + clusterIdToReap); - emr.terminateJobFlows(new TerminateJobFlowsRequest().withJobFlowIds(Arrays.asList(clusterIdToReap))); - DataPullTask.log.info("Reaped cluster " + clusterNameToReap + " with Id " + clusterIdToReap); + } else { + final RunJobFlowResult result = this.runTaskInNewCluster(emr, logPath, this.s3JarPath, Objects.toString(this.clusterProperties.getSparksubmitparams(), ""), haveBootstrapAction); + DataPullTask.log.info(result.toString()); + String resultSet = result.toString(); + int colonIndex = resultSet.indexOf(": "); + + if (colonIndex != -1) { + // Extract the substring starting from the next character after the colon and space + jobFlowId = resultSet.substring(colonIndex + 2); + jobFlowId = jobFlowId.replace("}", ""); } - }); + sendEmailNotification (); + } + + if (!reapClusters.isEmpty()) { + reapClusters.forEach(cluster -> { + String clusterIdToReap = cluster.getId(); + String clusterNameToReap = cluster.getName(); + //ensure we don't reap the cluster we just used + if (!clusters.isEmpty() && clusters.get(0).getId().equals(clusterIdToReap)) { + DataPullTask.log.info("Cannot reap in-use cluster " + clusterNameToReap + " with Id " + clusterIdToReap); + } else { + DataPullTask.log.info("About to reap cluster " + clusterNameToReap + " with Id " + clusterIdToReap); + emr.terminateJobFlows(new TerminateJobFlowsRequest().withJobFlowIds(Arrays.asList(clusterIdToReap))); + DataPullTask.log.info("Reaped cluster " + clusterNameToReap + " with Id " + clusterIdToReap); + } + }); + } + }catch (Exception e) { + storedException = e; + logger.severe("Error occurred: " + e.getMessage()); + logger.severe("Full exception details: " + e); + stackTrace = getStackTraceAsString(e); + runSparkClusterFail = true; + sendEmailNotification (); } } @@ -200,21 +259,32 @@ private List arrayToList(Array args) { return null; } - private List prepareSparkSubmitParams(final String SparkSubmitParams) { - final List sparkSubmitParamsList = new ArrayList<>(); - String[] sparkSubmitParamsArray = null; - if (SparkSubmitParams != "") { - sparkSubmitParamsArray = SparkSubmitParams.split("\\s+"); - sparkSubmitParamsList.add("spark-submit"); + private List prepareSparkSubmitParams(final String sparkSubmitParams) throws ProcessingException { + try { + final List sparkSubmitParamsList = new ArrayList<>(); - sparkSubmitParamsList.addAll(DataPullTask.toList(sparkSubmitParamsArray)); - } + if (sparkSubmitParams != null && !sparkSubmitParams.isEmpty()) { + String[] sparkSubmitParamsArray = sparkSubmitParams.split("\\s+"); + sparkSubmitParamsList.add("spark-submit"); + sparkSubmitParamsList.addAll(DataPullTask.toList(sparkSubmitParamsArray)); + } - return sparkSubmitParamsList; + return sparkSubmitParamsList; + } catch (Exception e) { + storedException = e; + logger.severe("Error occurred: " + e.getMessage()); + stackTrace = getStackTraceAsString(e); + InvalidParamList = true; + sendEmailNotification (); + e.printStackTrace(); + return Collections.emptyList(); // Return an empty list or handle the exception accordingly + } } - private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, final String logPath, final String jarPath, final String sparkSubmitParams, final Boolean haveBootstrapAction) { + + private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, final String logPath, final String jarPath, final String sparkSubmitParams, final Boolean haveBootstrapAction) throws ProcessingException { + try { HadoopJarStepConfig runExampleConfig = null; @@ -351,29 +421,103 @@ private RunJobFlowResult runTaskInNewCluster(final AmazonElasticMapReduce emr, f request.withSecurityConfiguration(emrSecurityConfiguration); } - final BootstrapActionConfig bsConfig = new BootstrapActionConfig(); - final ScriptBootstrapActionConfig sbsConfig = new ScriptBootstrapActionConfig(); - String bootstrapActionFilePathFromUser = Objects.toString(clusterProperties.getBootstrap_action_file_path(), ""); - if (!bootstrapActionFilePathFromUser.isEmpty()) { - bsConfig.setName("Bootstrap action from file"); - sbsConfig.withPath(bootstrapActionFilePathFromUser); - if (clusterProperties.getBootstrap_action_arguments() != null) { - sbsConfig.setArgs(clusterProperties.getBootstrap_action_arguments()); + final BootstrapActionConfig bsConfig = new BootstrapActionConfig(); + final ScriptBootstrapActionConfig sbsConfig = new ScriptBootstrapActionConfig(); + String bootstrapActionFilePathFromUser = Objects.toString(clusterProperties.getBootstrap_action_file_path(), ""); + if (!bootstrapActionFilePathFromUser.isEmpty()) { + bsConfig.setName("Bootstrap action from file"); + sbsConfig.withPath(bootstrapActionFilePathFromUser); + if (clusterProperties.getBootstrap_action_arguments() != null) { + sbsConfig.setArgs(clusterProperties.getBootstrap_action_arguments()); + } + bsConfig.setScriptBootstrapAction(sbsConfig); + request.withBootstrapActions(bsConfig); + } else if (haveBootstrapAction) { + bsConfig.setName("bootstrapaction"); + sbsConfig.withPath("s3://" + this.jksS3Path); + bsConfig.setScriptBootstrapAction(sbsConfig); + request.withBootstrapActions(bsConfig); } - bsConfig.setScriptBootstrapAction(sbsConfig); - request.withBootstrapActions(bsConfig); - } else if (haveBootstrapAction) { - bsConfig.setName("bootstrapaction"); - sbsConfig.withPath("s3://" + this.jksS3Path); - bsConfig.setScriptBootstrapAction(sbsConfig); - request.withBootstrapActions(bsConfig); - } - return emr.runJobFlow(request); + return emr.runJobFlow(request); + + }catch (Exception e) { + storedException = e; + logger.severe("Error occurred: " + e.getMessage()); + stackTrace = getStackTraceAsString(e); + jobFlowId = null; + throw new ProcessingException("Error in runTaskInNewCluster", e); + } } + + private void sendEmailNotification() throws ProcessingException { + String emailStatusCodeVal = null; + String emailAddress = null; + try { + + if (jobFlowId != null) { + emailStatusCodeVal = emailStatusCode.CLUSTER_CREATION_SUCCESS.name(); + emailAddress = dataPullRequestProcessor.successMailAddress().get(taskId); + } else if (ClusterId != null) { + emailStatusCodeVal = emailStatusCode.SPARK_EXEC_ON_EXISTING_CLUSTER.name(); + emailAddress = dataPullRequestProcessor.successMailAddress().get(taskId); + } else { + if (jobFlowId == null) { + emailStatusCodeVal = emailStatusCode.CLUSTER_CREATION_FAILED.name(); + } else if (runSparkClusterFail == true) { + emailStatusCodeVal = emailStatusCode.SPARK_EXEC_FAILED.name(); + } else if (InvalidParamList == true) { + emailStatusCodeVal = emailStatusCode.INVALID_PARAMS.name(); + } else if (runTaskOnExistingClusterFail == true) { + emailStatusCodeVal = emailStatusCode.RUN_ON_EXISTING_CLUSTER_FAILED.name(); + } + emailAddress = dataPullRequestProcessor.failureMailAddress().get(taskId); + } + + String[] emailAddressArray = null; + InternetAddress[] internetAddress = null; + + if (emailAddress != null && !emailAddress.isEmpty()) { + emailAddressArray = emailAddress.split(","); + } + + if (emailAddressArray != null && emailAddressArray.length > 0) { + // Create an array to store InternetAddress objects + internetAddress = new InternetAddress[emailAddressArray.length]; + for (int i = 0; i < emailAddressArray.length; i++) { + try { + internetAddress[i] = new InternetAddress(emailAddressArray[i].trim()); + } catch (AddressException ex) { + ex.printStackTrace(); + } + } + } + + String to = Arrays.stream(internetAddress) + .map(InternetAddress::getAddress) + .collect(Collectors.joining(", ")); + + emailNotification.sendEmail(emailStatusCodeVal,to, jobFlowId, this.taskId,stackTrace,ClusterId); + + } + catch(Exception e) { + processingException = new ProcessingException("Error in sendEmailNotification: " + e.getMessage(), e); + DataPullTask.log.error("Error in sendEmailNotification: " + e.getMessage(), e); + throw processingException; + } + } + + private String getStackTraceAsString(Exception e) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + return sw.toString(); + } private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProperties, ClusterProperties clusterProperties, - DataPullProperties dataPullProperties) { + DataPullProperties dataPullProperties) throws ProcessingException { + + try { final int instanceCount = emrProperties.getInstanceCount(); final String masterType = emrProperties.getMasterType(); @@ -403,7 +547,7 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper subnetIds.addAll(toList(new String[]{dataPullProperties.getApplicationSubnet1(), dataPullProperties.getApplicationSubnet2()})); - if(dataPullProperties.getApplicationSubnet3()!=null){ + if(StringUtils.isNotBlank(dataPullProperties.getApplicationSubnet3())) { subnetIds.add(dataPullProperties.getApplicationSubnet3()); } @@ -445,10 +589,20 @@ private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProper jobConfig.withEc2KeyName(clusterProperties.getEc2KeyName()); } - if (count> 1) { - jobConfig.withInstanceFleets(workerInstanceFleetConfig); + if (count > 1) { + jobConfig.withInstanceFleets(workerInstanceFleetConfig); + } + + return jobConfig; + } + catch (Exception e) { + storedException = e; + logger.severe("Error occurred: " + e.getMessage()); + stackTrace = getStackTraceAsString(e); + getJobFlowInstancesConfigFail = true; + sendEmailNotification (); + return null; } - return jobConfig; } private void addTagsToEMRCluster() { @@ -458,18 +612,21 @@ private void addTagsToEMRCluster() { this.addTags(clusterProperties.getTags()); //added for giving precedence to user tags } - private void runTaskOnExistingCluster(final String id, final String jarPath, final boolean terminateClusterAfterExecution, final String sparkSubmitParams) { + private void runTaskOnExistingCluster(final String id, final String jarPath, final boolean terminateClusterAfterExecution, final String sparkSubmitParams) throws ProcessingException { + try { + HadoopJarStepConfig runExampleConfig = null; - HadoopJarStepConfig runExampleConfig = null; + ClusterId = id; - List sparkSubmitParamsListOnExistingCluster = new ArrayList<>(); - if (sparkSubmitParams != null && !sparkSubmitParams.isEmpty()) { - sparkSubmitParamsListOnExistingCluster = this.prepareSparkSubmitParams(sparkSubmitParams); - } else { - List sparkBaseParams = new ArrayList<>(); - sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath})); - sparkSubmitParamsListOnExistingCluster.addAll(sparkBaseParams); - } + sendEmailNotification (); + List sparkSubmitParamsListOnExistingCluster = new ArrayList<>(); + if (sparkSubmitParams != null && !sparkSubmitParams.isEmpty()) { + sparkSubmitParamsListOnExistingCluster = this.prepareSparkSubmitParams(sparkSubmitParams); + } else { + List sparkBaseParams = new ArrayList<>(); + sparkBaseParams.addAll(toList(new String[]{"spark-submit", "--conf", "spark.default.parallelism=3", "--conf", "spark.storage.blockManagerSlaveTimeoutMs=1200s", "--conf", "spark.executor.heartbeatInterval=900s", "--conf", "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--conf", "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=/etc/pki/java/cacerts/ -Djavax.net.ssl.trustStorePassword=changeit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4", "--class", DataPullTask.MAIN_CLASS, jarPath})); + sparkSubmitParamsListOnExistingCluster.addAll(sparkBaseParams); + } if (clusterProperties.getSpark_submit_arguments() != null) { sparkSubmitParamsListOnExistingCluster.addAll(clusterProperties.getSpark_submit_arguments()); @@ -485,12 +642,20 @@ private void runTaskOnExistingCluster(final String id, final String jarPath, fin .withName(this.taskId) .withHadoopJarStep(runExampleConfig).withActionOnFailure("CONTINUE"); - final AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(); - req.withJobFlowId(id); - req.withSteps(step); - this.config.getEMRClient().addJobFlowSteps(req); - if (terminateClusterAfterExecution) { - this.addTerminateStep(id); + final AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(); + req.withJobFlowId(id); + req.withSteps(step); + this.config.getEMRClient().addJobFlowSteps(req); + if (terminateClusterAfterExecution) { + this.addTerminateStep(id); + + } + }catch (Exception e) { + storedException = e; + logger.severe("Error occurred: " + e.getMessage()); + stackTrace = getStackTraceAsString(e); + runTaskOnExistingClusterFail = true; + sendEmailNotification (); } } @@ -630,4 +795,5 @@ private ListClustersResult retryListClusters(final AmazonElasticMapReduce emr, f return listClustersResult; } -} + +} \ No newline at end of file diff --git a/api/src/main/java/com/homeaway/datapullclient/utils/EmailNotification.java b/api/src/main/java/com/homeaway/datapullclient/utils/EmailNotification.java new file mode 100644 index 0000000..5fefb8e --- /dev/null +++ b/api/src/main/java/com/homeaway/datapullclient/utils/EmailNotification.java @@ -0,0 +1,141 @@ +package com.homeaway.datapullclient.utils; + +import javax.mail.*; +import javax.mail.internet.InternetAddress; +import javax.mail.internet.MimeMessage; +import java.util.Objects; +import java.util.Properties; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.simpleemail.AmazonSimpleEmailService; +import com.amazonaws.services.simpleemail.AmazonSimpleEmailServiceClientBuilder; +import com.amazonaws.services.simpleemail.model.Body; +import com.amazonaws.services.simpleemail.model.Content; +import com.amazonaws.services.simpleemail.model.Destination; +import com.amazonaws.services.simpleemail.model.SendEmailRequest; +import com.homeaway.datapullclient.config.DataPullClientConfig; +import com.homeaway.datapullclient.config.SMTPProperties; +import com.homeaway.datapullclient.config.SESProperties; +import com.homeaway.datapullclient.exception.ProcessingException; +import org.apache.commons.lang3.StringUtils; + +public class EmailNotification { + + DataPullClientConfig config; + String subject; + String messageBody1; + String messageBody2; + Boolean clusterStatus = true; + String htmlContent; + + public EmailNotification(DataPullClientConfig config) { + this.config = config; + } + + public void sendEmail(String emailStatusCodeVal, String to, String jobFlowId, String taskId, String stackTrace,String ClusterId) throws ProcessingException { + + final SMTPProperties smtpProperties = config.getSMTPProperties(); + final SESProperties sesProperties = config.getSESProperties(); + + if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "CLUSTER_CREATION_SUCCESS")) { + subject = " Cluster created successfully for datapull pipeline \"" + taskId +"\""; + messageBody1 = "Cluster ID for the datapull task \"" + taskId + "\" is \"" + jobFlowId + "\". Please monitor the EMR console for more details."; + clusterStatus = true; + } else if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "SPARK_EXEC_ON_EXISTING_CLUSTER")) { + subject = "Datapull pipeline \"" + taskId + "\" successfully submitted to an existing cluster "; + messageBody1 = "Cluster ID for the datapull job \"" + taskId + "\" is \"" + ClusterId + "\". Please monitor the EMR console for more details."; + clusterStatus = true; + } else { + if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "CLUSTER_CREATION_FAILED")) { + subject = "Cluster creation failed for datapull pipeline \"" + taskId +"\""; + messageBody1 = "Datapull Cluster creation failed with below error."; + } else if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "SPARK_EXEC_FAILED")) { + subject = "Spark job execution failed for datapull pipeline \"" + taskId +"\""; + messageBody1 = "Spark job execution failed with below error. "; + } else if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "INVALID_PARAMS")) { + subject = "Spark job execution failed for datapull pipeline \"" + taskId +"\""; + messageBody1 = "Spark job execution due to invalid configs. "; + } else if (StringUtils.equalsIgnoreCase(emailStatusCodeVal, "RUN_ON_EXISTING_CLUSTER_FAILED")) { + subject = "Unable to run the datapull pipeline \"" + taskId + "\" on cluster \"" + jobFlowId+"\""; + messageBody1 = "Unable to run the pipeline " + taskId; + } + messageBody2 = stackTrace; + clusterStatus = false; + } + + if (clusterStatus) { + htmlContent = "
Message
" + messageBody1 + "
"; + } else { + htmlContent = "

" + messageBody1 + "

Error Message
" + messageBody2 + "
"; + } + + if (StringUtils.isNotEmpty(smtpProperties.getEmailaddress()) && StringUtils.isNotEmpty(smtpProperties.getSmtpserveraddress())) { + sendEmailViaSMTP(smtpProperties, to, subject, htmlContent); + } else if (sesProperties.getEmail() != null && sesProperties.getSecretKey() != null && sesProperties.getAccessKey() != null) { + sendEmailViaSES(sesProperties, to, subject, htmlContent); + System.out.println("Test SES configs"); + } else { + throw new ProcessingException("SMTP or SES properties are not configured properly"); + } + } + + private void sendEmailViaSMTP(SMTPProperties smtpProperties, String to, String subject, String htmlContent) { + + Properties properties = new Properties(); + String from = smtpProperties.getEmailaddress().toString(); + properties.put("mail.smtp.host", smtpProperties.getSmtpserveraddress()); + properties.put("mail.smtp.port", StringUtils.isNotEmpty(smtpProperties.getSmtpport()) ? smtpProperties.getSmtpport() : "587"); + properties.put("mail.smtp.starttls.enable", StringUtils.isNotEmpty(smtpProperties.getSmtpstarttls()) ? smtpProperties.getSmtpstarttls() : "true"); + properties.put("mail.smtp.auth", "true"); + properties.put("mail.smtp.ssl.protocols", "TLSv1.2"); + properties.put("mail.smtp.ssl.trust", "*"); + + String password = ""; + + Session session = Session.getInstance(properties, new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(from, password); + } + }); + + try { + Message message = new MimeMessage(session); + message.setFrom(new InternetAddress(from)); + message.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to)); + message.setSubject(subject); + message.setContent(htmlContent, "text/html"); + + Transport.send(message); + System.out.println("Email sent successfully!"); + } catch (MessagingException e) { + e.printStackTrace(); + System.err.println("Error sending email: " + e.getMessage()); + } + } + + private void sendEmailViaSES(SESProperties sesProperties, String to, String subject, String htmlContent) throws ProcessingException { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(sesProperties.getAccessKey(), sesProperties.getSecretKey()); + AmazonSimpleEmailServiceClientBuilder builder = AmazonSimpleEmailServiceClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .withRegion(sesProperties.getRegion()); + AmazonSimpleEmailService client = builder.build(); + SendEmailRequest request = new SendEmailRequest() + .withDestination(new Destination().withToAddresses(to)) + .withMessage(new com.amazonaws.services.simpleemail.model.Message() + .withBody(new Body() + .withHtml(new Content().withCharset("UTF-8").withData(htmlContent))) + .withSubject(new Content().withCharset("UTF-8").withData(subject))) + .withSource(sesProperties.getEmail()); + try { + client.sendEmail(request); + System.out.println("Email sent successfully via SES!"); + } catch (AmazonServiceException e) { + e.printStackTrace(); + System.err.println("Error sending email via SES: " + e.getMessage()); + throw new ProcessingException("Error sending email via SES", e); + } + } +} \ No newline at end of file diff --git a/api/terraform/datapull_task/ecs_deploy.sh b/api/terraform/datapull_task/ecs_deploy.sh index 2715b59..c6d1cb7 100644 --- a/api/terraform/datapull_task/ecs_deploy.sh +++ b/api/terraform/datapull_task/ecs_deploy.sh @@ -179,7 +179,7 @@ exitAfterFailure ENV TZ=America/Los_Angeles RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -docker build -t "${docker_image_name}" . +docker build --platform linux/amd64 -t "${docker_image_name}" . cd terraform/datapull_task diff --git a/core/src/main/scala/core/DataFrameFromTo.scala b/core/src/main/scala/core/DataFrameFromTo.scala index cbddcfe..5755846 100644 --- a/core/src/main/scala/core/DataFrameFromTo.scala +++ b/core/src/main/scala/core/DataFrameFromTo.scala @@ -1277,7 +1277,7 @@ class DataFrameFromTo(appConfig: AppConfig, pipeline: String) extends Serializab // create the statement, and run the command val statement = connection.createStatement() - if (colType != null) { + if (colType.isDefined) { resultSet= statement.executeQuery(sql_command) }