Errors! | ")
+ .append(Instant.now().toString())
+ .append(" | ")
+ .append(message)
+ .append(" |
");
+ Controller controllerInstance = new Controller(config, pipelineName);
+ String htmlContent = controllerInstance.neatifyReportHtml(reportbodyHtml.toString(), true, true);
+ controllerInstance.SendEmail(creator, htmlContent, applicationId,
+ pipelineName, env, subject, "");
+ }
+ catch (IOException e) {
+ DataPullTask.log.error("Error while sending email", e);
+ }
+
+ }
+
+ private boolean waitForClusterReady(AmazonElasticMapReduce emrClient, String clusterId) {
+ DescribeClusterRequest describeRequest = new DescribeClusterRequest().withClusterId(clusterId);
+ int maxRetries = 60;
+ int retryIntervalSeconds = 30;
+ int retries = 0;
+
+ while (retries < maxRetries) {
+ try {
+ DescribeClusterResult describeResult = emrClient.describeCluster(describeRequest);
+ ClusterStatus status = describeResult.getCluster().getStatus();
+ String state = status.getState();
+ DataPullTask.log.info("Cluster {} is in state {}", clusterId, state);
+
+ switch (state) {
+ case "WAITING":
+ case "RUNNING":
+ DataPullTask.log.info("Cluster {} is ready.", clusterId);
+ return true;
+ case "TERMINATED_WITH_ERRORS":
+ case "TERMINATED":
+ String reason = status.getStateChangeReason().getMessage();
+ DataPullTask.log.error("Cluster {} failed to start. Reason: {}", clusterId, reason);
+ return false;
+ default:
+ // Cluster is still starting up
+ break;
+ }
+
+ // Wait before polling again
+ Thread.sleep(retryIntervalSeconds * 1000);
+ retries++;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ DataPullTask.log.error("Interrupted while waiting for cluster to start.", e);
+ return false;
+ } catch (Exception e) {
+ DataPullTask.log.error("Error while checking cluster status.", e);
+ return false;
+ }
+ }
+
+ DataPullTask.log.error("Cluster {} did not start within the expected time.", clusterId);
+ return false;
+ }
+
private JobFlowInstancesConfig getJobFlowInstancesConfig(EMRProperties emrProperties,
ClusterProperties clusterProperties,
DataPullProperties dataPullProperties) {