diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java index e7eff9616..10eccdf40 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -18,26 +18,20 @@ package org.apache.xtable.parquet; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Paths; import java.time.Instant; import java.util.*; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Builder; import lombok.NonNull; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; -import org.apache.xtable.exception.ReadException; import org.apache.xtable.hudi.*; import org.apache.xtable.hudi.HudiPathUtils; import org.apache.xtable.model.*; @@ -52,10 +46,15 @@ import org.apache.xtable.spi.extractor.ConversionSource; @Builder +@Log4j2 public class ParquetConversionSource implements ConversionSource { private static final ParquetSchemaExtractor schemaExtractor = ParquetSchemaExtractor.getInstance(); + private static final ParquetMetadataExtractor metadataExtractor = + ParquetMetadataExtractor.getInstance(); + private static final ParquetDataManager parquetDataManagerExtractor = + ParquetDataManager.getInstance(); private static final ParquetMetadataExtractor parquetMetadataExtractor = ParquetMetadataExtractor.getInstance(); @@ -69,7 +68,17 @@ public class ParquetConversionSource implements ConversionSource { private final String basePath; @NonNull private final Configuration hadoopConf; - private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) { + private List parquetFiles; + + // helper method to ensure files are loaded once + private List getOrLoadParquetFiles() { + if (this.parquetFiles == null) { + this.parquetFiles = parquetDataManagerExtractor.getParquetFiles(hadoopConf, basePath); + } + return this.parquetFiles; + } + + private InternalTable createInternalTableFromFile(ParquetFileConfig latestFile) { ParquetMetadata parquetMetadata = parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath()); MessageType parquetSchema = parquetMetadataExtractor.getSchema(parquetMetadata); @@ -94,49 +103,45 @@ private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) @Override public InternalTable getTable(Long modificationTime) { // get parquetFile at specific time modificationTime - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); - LocatedFileStatus file = getParquetFileAt(parquetFiles, modificationTime); + LocatedFileStatus parquetFile = + parquetDataManagerExtractor.getParquetDataFileAt(getOrLoadParquetFiles(), modificationTime); + ParquetFileConfig file = parquetDataManagerExtractor.getConfigFromFile(parquetFile, hadoopConf); return createInternalTableFromFile(file); } - private Stream getInternalDataFiles(Stream parquetFiles) { + private Stream getInternalDataFiles(Stream parquetFiles) { return parquetFiles.map( - file -> - InternalDataFile.builder() - .physicalPath(file.getPath().toString()) - .fileFormat(FileFormat.APACHE_PARQUET) - .fileSizeBytes(file.getLen()) - .partitionValues( - partitionValueExtractor.extractPartitionValues( - partitionSpecExtractor.spec( - partitionValueExtractor.extractSchemaForParquetPartitions( - parquetMetadataExtractor.readParquetMetadata( - hadoopConf, file.getPath()), - file.getPath().toString())), - HudiPathUtils.getPartitionPath(new Path(basePath), file.getPath()))) - .lastModified(file.getModificationTime()) - .columnStats( - parquetStatsExtractor.getColumnStatsForaFile( - parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()))) - .build()); + file -> { + ParquetMetadata metadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()); + return InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getSize()) + .partitionValues( + partitionValueExtractor.extractPartitionValues( + partitionSpecExtractor.spec( + partitionValueExtractor.extractSchemaForParquetPartitions( + metadata, file.getPath().toString())), + HudiPathUtils.getPartitionPath(new Path(basePath), file.getPath()))) + .lastModified(file.getModificationTime()) + .columnStats(parquetStatsExtractor.getColumnStatsForaFile(metadata)) + .build(); + }); } - private InternalDataFile createInternalDataFileFromParquetFile(FileStatus parquetFile) { + private InternalDataFile createInternalDataFileFromParquetFile(ParquetFileConfig parquetFile) { return InternalDataFile.builder() .physicalPath(parquetFile.getPath().toString()) .partitionValues( partitionValueExtractor.extractPartitionValues( partitionSpecExtractor.spec( partitionValueExtractor.extractSchemaForParquetPartitions( - parquetMetadataExtractor.readParquetMetadata( - hadoopConf, parquetFile.getPath()), - parquetFile.getPath().toString())), - basePath)) + parquetFile.getMetadata(), parquetFile.getPath().toString())), + HudiPathUtils.getPartitionPath(new Path(basePath), parquetFile.getPath()))) .lastModified(parquetFile.getModificationTime()) - .fileSizeBytes(parquetFile.getLen()) - .columnStats( - parquetStatsExtractor.getColumnStatsForaFile( - parquetMetadataExtractor.readParquetMetadata(hadoopConf, parquetFile.getPath()))) + .fileSizeBytes(parquetFile.getSize()) + .columnStats(parquetStatsExtractor.getColumnStatsForaFile(parquetFile.getMetadata())) .build(); } @@ -149,87 +154,103 @@ public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync syncIns @Override public TableChange getTableChangeForCommit(Long modificationTime) { - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); Set addedInternalDataFiles = new HashSet<>(); - List tableChangesAfter = - parquetFiles - .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) - .collect(Collectors.toList()); - InternalTable internalTable = getMostRecentTable(parquetFiles); - for (FileStatus tableStatus : tableChangesAfter) { - InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(tableStatus); + List filesMetadata = + parquetDataManagerExtractor.getParquetFilesMetadataAfterTime( + hadoopConf, getOrLoadParquetFiles(), modificationTime); + List tableChangesAfterMetadata = + parquetDataManagerExtractor.getParquetFilesMetadataAfterTime( + hadoopConf, getOrLoadParquetFiles(), modificationTime); + InternalTable internalTable = getMostRecentTableConfig(tableChangesAfterMetadata.stream()); + for (ParquetFileConfig fileMetadata : filesMetadata) { + InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(fileMetadata); addedInternalDataFiles.add(currentDataFile); } return TableChange.builder() + .sourceIdentifier( + getCommitIdentifier( + parquetDataManagerExtractor + .getMostRecentParquetFile(getOrLoadParquetFiles(), hadoopConf) + .getModificationTime())) .tableAsOfChange(internalTable) .filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build()) .build(); } - private InternalTable getMostRecentTable(Stream parquetFiles) { - LocatedFileStatus latestFile = getMostRecentParquetFile(parquetFiles); + private InternalTable getMostRecentTable( + List parquetFiles, Configuration conf) { + ParquetFileConfig latestFile = + parquetDataManagerExtractor.getMostRecentParquetFile(parquetFiles, conf); + return createInternalTableFromFile(latestFile); + } + + private InternalTable getMostRecentTableConfig(Stream parquetFiles) { + ParquetFileConfig latestFile = + parquetDataManagerExtractor.getMostRecentParquetFileConfig(parquetFiles); return createInternalTableFromFile(latestFile); } @Override public InternalTable getCurrentTable() { - Stream parquetFiles = getParquetFiles(hadoopConf, basePath); - return getMostRecentTable(parquetFiles); + return getMostRecentTable(getOrLoadParquetFiles(), hadoopConf); } - /** - * get current snapshot - * - * @return - */ @Override public InternalSnapshot getCurrentSnapshot() { - // to avoid consume the stream call the method twice to return the same stream of parquet files Stream internalDataFiles = - getInternalDataFiles(getParquetFiles(hadoopConf, basePath)); - InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, basePath)); + getInternalDataFiles( + parquetDataManagerExtractor.getConfigsFromStream(getOrLoadParquetFiles(), hadoopConf)); + InternalTable table = getMostRecentTable(getOrLoadParquetFiles(), hadoopConf); return InternalSnapshot.builder() .table(table) .sourceIdentifier( getCommitIdentifier( - getMostRecentParquetFile(getParquetFiles(hadoopConf, basePath)) + parquetDataManagerExtractor + .getMostRecentParquetFile(getOrLoadParquetFiles(), hadoopConf) .getModificationTime())) .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) .build(); } - private LocatedFileStatus getMostRecentParquetFile(Stream parquetFiles) { - return parquetFiles - .max(Comparator.comparing(FileStatus::getModificationTime)) - .orElseThrow(() -> new IllegalStateException("No files found")); - } + @Override + public boolean isIncrementalSyncSafeFrom(Instant timeInMillis) { + Stream parquetFilesMetadata = + parquetDataManagerExtractor.getConfigsFromStream(getOrLoadParquetFiles(), hadoopConf); + LongSummaryStatistics stats = + parquetFilesMetadata.mapToLong(ParquetFileConfig::getModificationTime).summaryStatistics(); - private LocatedFileStatus getParquetFileAt( - Stream parquetFiles, long modificationTime) { - return parquetFiles - .filter(fileStatus -> fileStatus.getModificationTime() == modificationTime) - .findFirst() - .orElseThrow(() -> new IllegalStateException("No file found at " + modificationTime)); - } + if (stats.getCount() == 0) { + log.warn("No parquet files found in table {}. Incremental sync is not possible.", tableName); + return false; + } - private Stream getParquetFiles(Configuration hadoopConf, String basePath) { - try { - FileSystem fs = FileSystem.get(hadoopConf); - URI uriBasePath = new URI(basePath); - String parentPath = Paths.get(uriBasePath).toString(); - RemoteIterator iterator = fs.listFiles(new Path(parentPath), true); - return RemoteIterators.toList(iterator).stream() - .filter(file -> file.getPath().getName().endsWith("parquet")); - } catch (IOException | URISyntaxException e) { - throw new ReadException("Unable to read files from file system", e); + long earliestModTime = stats.getMin(); + long latestModTime = stats.getMax(); + + if (timeInMillis.toEpochMilli() > latestModTime) { + log.warn( + "Instant {} is in the future relative to the data. Latest file time: {}", + timeInMillis.toEpochMilli(), + Instant.ofEpochMilli(latestModTime)); + return false; } - } - @Override - public boolean isIncrementalSyncSafeFrom(Instant instant) { - return false; + if (earliestModTime > timeInMillis.toEpochMilli()) { + log.warn( + "Incremental sync is not safe. Earliest available metadata (time={}) is newer " + + "than requested instant {}. Data history has been truncated.", + Instant.ofEpochMilli(earliestModTime), + timeInMillis.toEpochMilli()); + return false; + } + + log.info( + "Incremental sync is safe from instant {} for table {}", + timeInMillis.toEpochMilli(), + tableName); + return true; } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java new file mode 100644 index 000000000..c6372c198 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +import org.apache.xtable.exception.ReadException; + +/** + * Manages Parquet File's Metadata + * + *

This class provides functions to handle Parquet metadata, creating metadata objects from + * parquet files and filtering the files based on the modification times. + */ +@Log4j2 +public class ParquetDataManager { + public static final ParquetDataManager INSTANCE = new ParquetDataManager(); + + public static ParquetDataManager getInstance() { + return INSTANCE; + } + + public ParquetFileConfig getMostRecentParquetFile( + List parquetFiles, Configuration conf) { + LocatedFileStatus file = + parquetFiles.stream() + .max(Comparator.comparing(LocatedFileStatus::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); + return getConfigFromFile(file, conf); + } + + public ParquetFileConfig getMostRecentParquetFileConfig(Stream parquetFiles) { + return parquetFiles + .max(Comparator.comparing(ParquetFileConfig::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); + } + + public LocatedFileStatus getParquetDataFileAt( + List parquetFiles, long targetTime) { + + return parquetFiles.stream() + .filter(file -> file.getModificationTime() >= targetTime) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No file found at or after " + targetTime)); + } + + public ParquetFileConfig getConfigFromFile(LocatedFileStatus file, Configuration conf) { + + Path path = file.getPath(); + + ParquetMetadata metadata = + ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, path); + + return ParquetFileConfig.builder() + .schema(metadata.getFileMetaData().getSchema()) + .metadata(metadata) + .path(path) + .size(file.getLen()) + .modificationTime(file.getModificationTime()) + .rowGroupIndex(metadata.getBlocks().size()) + .codec( + metadata.getBlocks().isEmpty() + ? null + : metadata.getBlocks().get(0).getColumns().get(0).getCodec()) + .build(); + } + + public List getParquetFiles(Configuration hadoopConf, String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + URI uriBasePath = new URI(basePath); + String parentPath = Paths.get(uriBasePath).toString(); + RemoteIterator iterator = fs.listFiles(new Path(parentPath), true); + return RemoteIterators.toList(iterator).stream() + .filter(file -> file.getPath().getName().endsWith("parquet")) + .collect(Collectors.toList()); + } catch (IOException | URISyntaxException e) { + throw new ReadException("Unable to read files from file system", e); + } + } + + public Stream getConfigsFromStream( + List fileStream, Configuration conf) { + + return fileStream.stream() + .map( + fileStatus -> { + Path path = fileStatus.getPath(); + + ParquetMetadata metadata = + ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, path); + + return ParquetFileConfig.builder() + .schema(metadata.getFileMetaData().getSchema()) + .metadata(metadata) + .path(path) + .size(fileStatus.getLen()) + .modificationTime(fileStatus.getModificationTime()) + .rowGroupIndex(metadata.getBlocks().size()) + .codec( + metadata.getBlocks().isEmpty() + ? null + : metadata.getBlocks().get(0).getColumns().get(0).getCodec()) + .build(); + }); + } + + public List getParquetFilesMetadataInRange( + Configuration conf, Stream parquetFiles, long startTime, long endTime) { + + return parquetFiles + .filter( + file -> + file.getModificationTime() >= startTime && file.getModificationTime() <= endTime) + .map(file -> new ParquetFileConfig(conf, file)) + .collect(Collectors.toList()); + } + + public List getParquetFilesMetadataAfterTime( + Configuration conf, List parquetFiles, long syncTime) { + + return parquetFiles.stream() + .filter(file -> file.getModificationTime() >= syncTime) + .map(file -> new ParquetFileConfig(conf, file)) + .collect(Collectors.toList()); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileConfig.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileConfig.java new file mode 100644 index 000000000..ec40a79f5 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.experimental.FieldDefaults; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import org.apache.xtable.exception.ReadException; + +@Log4j2 +@Getter +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +@Builder +@AllArgsConstructor +class ParquetFileConfig { + MessageType schema; + ParquetMetadata metadata; + long rowGroupIndex; + long modificationTime; + long size; + CompressionCodecName codec; + Path path; + + public ParquetFileConfig(Configuration conf, FileStatus file) { + long modificationTime = -1L; + ParquetMetadata metadata = + ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, file.getPath()); + + if (metadata.getBlocks().isEmpty()) { + throw new IllegalStateException("Parquet file contains no row groups."); + } + try { + modificationTime = file.getModificationTime(); + } catch (ReadException e) { + log.warn("File reading error: " + e.getMessage()); + } + this.path = file.getPath(); + this.modificationTime = modificationTime; + this.size = metadata.getBlocks().stream().mapToLong(BlockMetaData::getTotalByteSize).sum(); + this.metadata = metadata; + this.schema = metadata.getFileMetaData().getSchema(); + this.rowGroupIndex = metadata.getBlocks().size(); + this.codec = metadata.getBlocks().get(0).getColumns().get(0).getCodec(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetDataManager.java b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetDataManager.java new file mode 100644 index 000000000..08e11ed93 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetDataManager.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.apache.spark.sql.functions.expr; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.TableFormat; + +public class ITParquetDataManager { + private static SparkSession spark; + private static ParquetConversionSourceProvider conversionSourceProvider; + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.parquet.source.partition_field_spec_config"; + + @BeforeAll + public static void setup() { + spark = SparkSession.builder().appName("ParquetTest").master("local[*]").getOrCreate(); + conversionSourceProvider = new ParquetConversionSourceProvider(); + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.defaultFS", "file:///"); + conversionSourceProvider.init(hadoopConf); + } + + @Test + public void testAppendParquetFileMultiplePartition() throws IOException { + + Configuration conf = spark.sparkContext().hadoopConfiguration(); + + StructType schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("value", DataTypes.StringType, false), + DataTypes.createStructField("year", DataTypes.IntegerType, false), + DataTypes.createStructField("month", DataTypes.IntegerType, false) + }); + List data = + Arrays.asList( + RowFactory.create(100, "A", 2026, 12), + RowFactory.create(101, "AA", 2026, 12), + RowFactory.create(102, "CB", 2027, 11), + RowFactory.create(103, "BA", 2027, 11)); + + Dataset dfInit = spark.createDataFrame(data, schema); + Path fixedPath = Paths.get("target", "fixed-parquet-data", "parquet_table_test_2"); + Path appendFilePath = Paths.get("target", "fixed-parquet-data", "parquet_file_test_2"); + String outputPath = fixedPath.toString(); + String finalAppendFilePath = appendFilePath.toString(); + Dataset df = dfInit.withColumn("full_date", expr("make_date(year, month, 1)")); + df.coalesce(1).write().partitionBy("year", "month").mode("overwrite").parquet(outputPath); + + // test find files to sync + + org.apache.hadoop.fs.Path hdfsPath = new org.apache.hadoop.fs.Path(outputPath); + FileSystem fs = FileSystem.get(hdfsPath.toUri(), conf); + // set the modification time to the table file + // update modifTime for file to append + // many partitions case + List newPartitions = Arrays.asList("year=2026/month=12", "year=2027/month=11"); + long targetModifTime = System.currentTimeMillis() - 360000; + long newModifTime = System.currentTimeMillis() - 50000; + long testTime = System.currentTimeMillis() - 90000; // between two prev times + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + if (fs.exists(partitionPath)) { + updateModificationTimeRecursive(fs, partitionPath, targetModifTime); + } + } + // create new file to append using Spark + List futureDataToSync = + Arrays.asList( + RowFactory.create(101, "A", 2026, 12), + RowFactory.create(301, "D", 2027, 11), + RowFactory.create(302, "DA", 2027, 11)); + Dataset dfToSyncInit = spark.createDataFrame(futureDataToSync, schema); + Dataset dfToSync = dfToSyncInit.withColumn("full_date", expr("make_date(year, month, 1)")); + dfToSync + .coalesce(1) + .write() + .partitionBy("year", "month") + .mode("overwrite") + .parquet(finalAppendFilePath); + + Dataset dfWithNewTimes = spark.read().parquet(finalAppendFilePath); + dfWithNewTimes + .coalesce(1) + .write() + .partitionBy("year", "month") + .mode("append") + .parquet(outputPath); + fs.delete(new org.apache.hadoop.fs.Path(finalAppendFilePath), true); + // conversionSource operations + Properties sourceProperties = new Properties(); + String partitionConfig = "full_date:MONTH:year=yyyy/month=MM"; + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + SourceTable tableConfig = + SourceTable.builder() + .name("parquet_table_test_2") + .basePath(fixedPath.toAbsolutePath().toUri().toString()) + .additionalProperties(sourceProperties) + .formatName(TableFormat.PARQUET) + .build(); + + ParquetConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + // long newModifTime = System.currentTimeMillis() - 50000; + + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + + RemoteIterator it = fs.listFiles(partitionPath, false); + while (it.hasNext()) { + LocatedFileStatus fileStatus = it.next(); + + if (fileStatus.getModificationTime() > newModifTime) { + fs.setTimes(fileStatus.getPath(), newModifTime, -1); + } else { + + fs.setTimes(fileStatus.getPath(), targetModifTime, -1); + } + } + + fs.setTimes(partitionPath, newModifTime, -1); + } + + InternalTable result = conversionSource.getTable(newModifTime); + assertEquals( + Instant.ofEpochMilli(newModifTime).toString(), result.getLatestCommitTime().toString()); + assertNotNull(result); + assertEquals("parquet_table_test_2", result.getName()); + assertEquals(TableFormat.PARQUET, result.getTableFormat()); + assertNotNull(result.getReadSchema()); + InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); + assertNotNull(snapshot); + TableChange changes = conversionSource.getTableChangeForCommit(newModifTime); + assertNotNull(changes); + Instant instantBeforeFirstSnapshot = + Instant.ofEpochMilli(snapshot.getTable().getLatestCommitTime().toEpochMilli()); + assertEquals(instantBeforeFirstSnapshot.toEpochMilli(), newModifTime); + assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(testTime))); + } + + private void updateModificationTimeRecursive( + FileSystem fs, org.apache.hadoop.fs.Path path, long time) throws IOException { + org.apache.hadoop.fs.RemoteIterator it = + fs.listFiles(path, true); + while (it.hasNext()) { + org.apache.hadoop.fs.LocatedFileStatus status = it.next(); + if (status.getPath().getName().endsWith(".parquet")) { + fs.setTimes(status.getPath(), time, -1); + } + } + } + + @AfterAll + public static void tearDown() { + if (spark != null) { + spark.stop(); + } + } +}