From 9343bb7b485485ef83ceee1a0ba4f2fd16c69030 Mon Sep 17 00:00:00 2001 From: Sridhar Paladugu Date: Thu, 18 Feb 2016 20:17:40 -0500 Subject: [PATCH 1/5] adding sql to incremental hdfs load. --- .gitignore | 5 ++ .../x/IncrementalColumnRangePartitioner.java | 63 +++++++++++++------ modules/job/jdbchdfs/config/jdbchdfs.xml | 1 + 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index da198d01e..202f6d820 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,8 @@ lib spring-xd-spark-streaming/.cache .gradletasknamecache spring-xd.ids +build/ +spring-xd-shell/ +spring-xd-rest-client/ +spring-xd-gemfire-server/ +redis/ diff --git a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitioner.java b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitioner.java index 213be8b51..a93cdc634 100644 --- a/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitioner.java +++ b/extensions/spring-xd-extension-batch/src/main/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitioner.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.batch.integration.x; import java.util.HashMap; @@ -56,6 +57,8 @@ public class IncrementalColumnRangePartitioner implements Partitioner, StepExecu private String table; + private String sql; + private String column; private int partitions; @@ -120,6 +123,10 @@ public void setOverrideValue(Long overrideValue) { this.overrideValue = overrideValue; } + public void setSql(String sql) { + this.sql = sql; + } + /** * Partition a database table assuming that the data in the column specified * are uniformly distributed. The execution context values will have keys @@ -133,14 +140,14 @@ public Map partition(int gridSize) { StringBuilder incrementalClause = new StringBuilder(); Map result = new HashMap<>(); - if(!StringUtils.hasText(checkColumn) && !StringUtils.hasText(column)) { + if (!StringUtils.hasText(checkColumn) && !StringUtils.hasText(column)) { ExecutionContext value = new ExecutionContext(); value.put("partClause", ""); result.put("partition0", value); value.put("partSuffix", ""); } else { - if(StringUtils.hasText(checkColumn)) { + if (StringUtils.hasText(checkColumn)) { incrementalClause.append(checkColumn).append(" > ").append(this.incrementalMin); } @@ -158,14 +165,15 @@ public Map partition(int gridSize) { end = this.partitionMax; } - if(StringUtils.hasText(checkColumn)) { - value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s) AND %s", column, start, end, incrementalClause.toString())); + if (StringUtils.hasText(checkColumn)) { + value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s) AND %s", column, start, + end, incrementalClause.toString())); } else { value.putString("partClause", String.format("WHERE (%s BETWEEN %s AND %s)", column, start, end)); } - value.putString("partSuffix", "-p"+number); + value.putString("partSuffix", "-p" + number); start += targetSize; end += targetSize; number++; @@ -179,9 +187,9 @@ public Map partition(int gridSize) { @Override public void beforeStep(StepExecution stepExecution) { - if(StringUtils.hasText(checkColumn)) { + if (StringUtils.hasText(checkColumn)) { - if(overrideValue != null && overrideValue >= 0) { + if (overrideValue != null && overrideValue >= 0) { this.incrementalMin = overrideValue; } else { @@ -190,7 +198,7 @@ public void beforeStep(StepExecution stepExecution) { // Get the last jobInstance...not the current one List jobInstances = jobExplorer.getJobInstances(jobName, 1, 1); - if(jobInstances.size() > 0) { + if (jobInstances.size() > 0) { JobInstance lastInstance = jobInstances.get(jobInstances.size() - 1); List executions = jobExplorer.getJobExecutions(lastInstance); @@ -198,12 +206,12 @@ public void beforeStep(StepExecution stepExecution) { JobExecution lastExecution = executions.get(0); for (JobExecution execution : executions) { - if(lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) { + if (lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) { lastExecution = execution; } } - if(lastExecution.getExecutionContext().containsKey(BATCH_INCREMENTAL_MAX_ID)) { + if (lastExecution.getExecutionContext().containsKey(BATCH_INCREMENTAL_MAX_ID)) { this.incrementalMin = lastExecution.getExecutionContext().getLong(BATCH_INCREMENTAL_MAX_ID); } else { @@ -215,15 +223,24 @@ public void beforeStep(StepExecution stepExecution) { } } - long newMin = jdbcTemplate.queryForObject(String.format("select max(%s) from %s", checkColumn, table), Integer.class); - - stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin); + if (StringUtils.hasText(table)) { + long newMin = jdbcTemplate.queryForObject(String.format("select max(%s) from %s", checkColumn, table), + Integer.class); + stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin); + } + else if (StringUtils.hasText(sql)) { + String maxSqlStr = "SELECT max(" + checkColumn + ") from (" + sql + ") as boundQry"; + long newMin = jdbcTemplate.queryForObject(maxSqlStr, Long.class); + stepExecution.getExecutionContext().put(BATCH_INCREMENTAL_MAX_ID, newMin); + } } - if(StringUtils.hasText(column) && StringUtils.hasText(table)) { - if(StringUtils.hasText(checkColumn)) { - Long minResult = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table + " where " + checkColumn + " > " + this.incrementalMin, Long.class); - Long maxResult = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table + " where " + checkColumn + " > " + this.incrementalMin, Long.class); + if (StringUtils.hasText(column) && StringUtils.hasText(table)) { + if (StringUtils.hasText(checkColumn)) { + Long minResult = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table + " where " + + checkColumn + " > " + this.incrementalMin, Long.class); + Long maxResult = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table + " where " + + checkColumn + " > " + this.incrementalMin, Long.class); this.partitionMin = minResult != null ? minResult : Long.MIN_VALUE; this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE; } @@ -234,6 +251,16 @@ public void beforeStep(StepExecution stepExecution) { this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE; } } + else if (StringUtils.hasText(sql)) { + String maxSqlStr = "SELECT MIN(" + column + ") from (" + sql + ") as boundQry where " + checkColumn + " > " + + this.incrementalMin; + String minSqlStr = "SELECT MAX(" + column + ") from (" + sql + ") as boundQry where " + checkColumn + " > " + + this.incrementalMin; + Long minResult = jdbcTemplate.queryForObject(maxSqlStr, Long.class); + Long maxResult = jdbcTemplate.queryForObject(minSqlStr, Long.class); + this.partitionMin = minResult != null ? minResult : Long.MIN_VALUE; + this.partitionMax = maxResult != null ? maxResult : Long.MAX_VALUE; + } } @Override @@ -243,7 +270,7 @@ public ExitStatus afterStep(StepExecution stepExecution) { @Override public void afterPropertiesSet() throws Exception { - if(!StringUtils.hasText(this.column)) { + if (!StringUtils.hasText(this.column)) { this.column = this.checkColumn; } } diff --git a/modules/job/jdbchdfs/config/jdbchdfs.xml b/modules/job/jdbchdfs/config/jdbchdfs.xml index 38a158fc3..5ba19abe0 100644 --- a/modules/job/jdbchdfs/config/jdbchdfs.xml +++ b/modules/job/jdbchdfs/config/jdbchdfs.xml @@ -39,6 +39,7 @@ + From 14d357e56714fd8733f34396ec7b97ec355bae1a Mon Sep 17 00:00:00 2001 From: Sridhar Paladugu Date: Thu, 18 Feb 2016 20:39:27 -0500 Subject: [PATCH 2/5] clean git ignore --- .gitignore | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.gitignore b/.gitignore index 202f6d820..da198d01e 100644 --- a/.gitignore +++ b/.gitignore @@ -35,8 +35,3 @@ lib spring-xd-spark-streaming/.cache .gradletasknamecache spring-xd.ids -build/ -spring-xd-shell/ -spring-xd-rest-client/ -spring-xd-gemfire-server/ -redis/ From e974768a4e3876bdb2b13d88b22138d51334d1c4 Mon Sep 17 00:00:00 2001 From: Sridhar Paladugu Date: Fri, 19 Feb 2016 20:21:12 -0500 Subject: [PATCH 3/5] Adding Unit Test. --- ...IncrementalColumnRangePartitionerTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java index 81200f929..f091769da 100644 --- a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java +++ b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import javax.sql.DataSource; @@ -29,9 +31,16 @@ import org.junit.runner.RunWith; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; +import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; import org.springframework.batch.item.ExecutionContext; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -106,4 +115,43 @@ public void testFivePartitions() { assertEquals("-p4", partitions.get("partition4").get("partSuffix")); } + + @Test + public void testTwoPartitionsForSql() throws Throwable { + jdbc.execute("insert into bar (foo) values (1), (2), (3), (4)"); + partitioner.setCheckColumn("foo"); + partitioner.setColumn("foo"); + partitioner.setSql("select * from bar"); + partitioner.setPartitions(2); + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + MapJobRepositoryFactoryBean repositoryFactory = new MapJobRepositoryFactoryBean(); + repositoryFactory.afterPropertiesSet(); + JobRepository jobRepository = repositoryFactory.getObject(); + jobLauncher.setJobRepository(jobRepository); + jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); + jobLauncher.afterPropertiesSet(); + JobExplorer jobExplorer = new MapJobExplorerFactoryBean(repositoryFactory).getObject(); + partitioner.setJobExplorer(jobExplorer); + JobExecution jobExec = new JobExecution(5l); + JobInstance jobInstance = new JobInstance(5l, "testIncrementalJDBCSqlJob"); + jobExec.setJobInstance(jobInstance); + StepExecution stepExec = new StepExecution("step1", jobExec); + List stepExecutions = new ArrayList(); + stepExecutions.add(stepExec); + jobExec.addStepExecutions(stepExecutions); + + partitioner.beforeStep(new StepExecution("step1", jobExec)); + Map partitions = partitioner.partition(1); + assertEquals(2, partitions.size()); + assertTrue(partitions.containsKey("partition0")); + String part1Expected = "WHERE (foo BETWEEN 1 AND 2) AND foo > " + Long.MIN_VALUE; + String part1Actual = (String) partitions.get("partition0").get("partClause"); + assertEquals(part1Expected, part1Actual); + assertEquals("-p0", partitions.get("partition0").get("partSuffix")); + assertTrue(partitions.containsKey("partition1")); + String part2Expected = "WHERE (foo BETWEEN 3 AND 4) AND foo > " + Long.MIN_VALUE; + String part2Actual = (String) partitions.get("partition1").get("partClause"); + assertEquals(part2Expected, part2Actual); + assertEquals("-p1", partitions.get("partition1").get("partSuffix")); + } } From 80e2a450b37ea58ae8326d2c1dda15797fd275da Mon Sep 17 00:00:00 2001 From: Sridhar Paladugu Date: Tue, 23 Feb 2016 17:21:45 -0500 Subject: [PATCH 4/5] adding unit test for incremenal index check for SQL load --- ...IncrementalColumnRangePartitionerTest.java | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java index f091769da..220ad797e 100644 --- a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java +++ b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java @@ -18,8 +18,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; @@ -133,7 +136,7 @@ public void testTwoPartitionsForSql() throws Throwable { JobExplorer jobExplorer = new MapJobExplorerFactoryBean(repositoryFactory).getObject(); partitioner.setJobExplorer(jobExplorer); JobExecution jobExec = new JobExecution(5l); - JobInstance jobInstance = new JobInstance(5l, "testIncrementalJDBCSqlJob"); + JobInstance jobInstance = new JobInstance(1l, "testIncrementalJDBCSqlJob"); jobExec.setJobInstance(jobInstance); StepExecution stepExec = new StepExecution("step1", jobExec); List stepExecutions = new ArrayList(); @@ -154,4 +157,67 @@ public void testTwoPartitionsForSql() throws Throwable { assertEquals(part2Expected, part2Actual); assertEquals("-p1", partitions.get("partition1").get("partSuffix")); } + + + @Test + public void testIncrementalSqlNextIterationValue() throws Throwable { + jdbc.execute("insert into bar (foo) values (1), (2), (3), (4)"); + partitioner.setCheckColumn("foo"); + partitioner.setColumn("foo"); + partitioner.setSql("select * from bar"); + + JobExplorer jobExplorer = mock(JobExplorer.class); + partitioner.setJobExplorer(jobExplorer); + JobExecution jobExec = new JobExecution(1l); + JobInstance jobInstance1 = new JobInstance(1l, "testIncrementalJDBCSqlJob"); + jobExec.setJobInstance(jobInstance1); + StepExecution stepExecution = new StepExecution("step1", jobExec); + + when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(new ArrayList()); + partitioner.beforeStep(stepExecution); + //first time the value is long minimum as there is no previous instance + Map partitions = partitioner.partition(1); + String queryPartClause = (String) partitions.get("partition0").get("partClause"); + assertTrue(queryPartClause.endsWith(Long.MIN_VALUE + "")); + //mark end of job and adjust the max + jobExec.setEndTime(new Date(System.currentTimeMillis())); + jobExec.getExecutionContext().put("batch.incremental.maxId", 4l); + + jdbc.execute("insert into bar (foo) values (5), (6), (7), (8)"); + + List jobInstances = new ArrayList(); + jobInstances.add(jobInstance1); + JobInstance jobInstance2 = new JobInstance(2l, "testIncrementalJDBCSqlJob"); + jobExec.setJobInstance(jobInstance2); + jobInstances.add(jobInstance2); + when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(jobInstances); + List executions = new ArrayList(); + executions.add(jobExec); + when(jobExplorer.getJobExecutions(jobInstance2)).thenReturn(executions); + partitioner.beforeStep(new StepExecution("step1", jobExec)); + //this time the value should be 4 + partitions = partitioner.partition(1); + queryPartClause = (String) partitions.get("partition0").get("partClause"); + assertTrue(queryPartClause.endsWith(4l + "")); + //mark end of job and adjust the max + jobExec.setEndTime(new Date(System.currentTimeMillis())); + jobExec.getExecutionContext().put("batch.incremental.maxId", 8l); + + jdbc.execute("insert into bar (foo) values (9), (10), (7), (8)"); + + JobInstance jobInstance3 = new JobInstance(3l, "testIncrementalJDBCSqlJob"); + jobExec.setJobInstance(jobInstance3); + jobInstances.add(jobInstance3); + when(jobExplorer.getJobInstances("testIncrementalJDBCSqlJob", 1, 1)).thenReturn(jobInstances); + executions.add(jobExec); + when(jobExplorer.getJobExecutions(jobInstance3)).thenReturn(executions); + partitioner.beforeStep(new StepExecution("step1", jobExec)); + //this time the value should be 4 + partitions = partitioner.partition(1); + queryPartClause = (String) partitions.get("partition0").get("partClause"); + assertTrue(queryPartClause.endsWith(8l + "")); + //mark end of job and adjust the max + jobExec.setEndTime(new Date(System.currentTimeMillis())); + jobExec.getExecutionContext().put("batch.incremental.maxId", 8l); + } } From e66b49afd0f54e91000e3785773710dc3f1e10c4 Mon Sep 17 00:00:00 2001 From: Sridhar Paladugu Date: Thu, 25 Feb 2016 22:37:59 -0500 Subject: [PATCH 5/5] adding partition param --- .../integration/x/IncrementalColumnRangePartitionerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java index 220ad797e..30dcc930f 100644 --- a/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java +++ b/extensions/spring-xd-extension-batch/src/test/java/org/springframework/batch/integration/x/IncrementalColumnRangePartitionerTest.java @@ -165,7 +165,7 @@ public void testIncrementalSqlNextIterationValue() throws Throwable { partitioner.setCheckColumn("foo"); partitioner.setColumn("foo"); partitioner.setSql("select * from bar"); - + partitioner.setPartitions(2); JobExplorer jobExplorer = mock(JobExplorer.class); partitioner.setJobExplorer(jobExplorer); JobExecution jobExec = new JobExecution(1l);