Skip to content

Conversation

@jbewing
Copy link
Contributor

@jbewing jbewing commented Jan 27, 2026

This PR updates all writes from Spark 4.0 & 3.5 to set the sort_order_id of data files when applicable per the Iceberg Table Spec. I've opened this PR to:

  1. Gauge interest in this and validate my approach. If the approach is good, the backport for these changes to Spark 4.0 & 3.5 is clean.
  2. If the approach is good, make a decision on whether to backport these changes to Spark 3.4. This patch doesn't apply super cleanly to 3.4 as it stands right now, so I don't want to invest effort into it unless others would find value from it.
  3. As a follow up to those PRs, implement the ability to then report this ordering information when applicable to Spark to better inform the query planner of potential optimizations. This is rather useful when used in conjunction with SPJ and I have this running on an Iceberg fork with success. I have done this in a fork to great success. @anuragmantri has since also expressed interest in a parallel patch that they are interested in taking to the finish line after this PR is in. Thank you for the continued interest and support of my changes Anurag! I look forward to collaborating on the follow up PR.

This is a successor to my initial PRs #13636 & #14683. The first of which has since been closed for being stale but I later revived for Spark 3.5 & 4.0 in #14683 at the request of @anuragmantri to complement a follow up change that we both converged on around using this optimization in conjunction w/ some Spark DSv2 APIs to report sort order when possible to optimize downstream query plans. After some discussion w/ @anuragmantr & @RussellSpitzer, I've forward ported the changes to spark 4.1 as the underlying "base" spark version has since changed. I've re-opened this PR as of late there has been some increased interest in this:

So it appears that there is value to these changes being upstreamed instead of confined to a fork.

Testing

I've added tests for newer added utility functions and updated existing tests that write data files and compact data files in a sorted manner to verify that we're setting the sort_order_id entry in the manifests to the correct value. Additionally, I've used this patch on an internal fork and verified that it correctly sets this field during compaction and normal writes.

Issue: #13634
cc @anuragmantri & @RussellSpitzer

@anuragmantri
Copy link
Contributor

Thanks @jbewing. @RussellSpitzer - I know the Iceberg summit abstracts are keeping you busy, could you take a look when you get a chance? Thank you!

/** A set of requirements such as distribution and ordering reported to Spark during writes. */
public class SparkWriteRequirements {

public static final long NO_ADVISORY_PARTITION_SIZE = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this change related to the PR?

public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id";

public static final String OUTPUT_SPEC_ID = "output-spec-id";
public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of broaching other concerns. I think it would make sense to start with just supporting writing the current table output-sort-order, then work on arbitrary sort order in a followup. Unless I'm reading this incorrectly?

.mode(SaveMode.Append)
.save(location.toString());

createBranch(table);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we branching?


HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, location.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we set the sort order here?

result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);

List<DataFile> files = Lists.newArrayList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use -

table.newScan().planFiles()

df.select("id", "data")
.write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is the write place for this test, since we shouldn't have different behavior based on file format ... probably fine


table.replaceSortOrder().asc("id").commit();

List<SimpleRecord> expected = Lists.newArrayListWithCapacity(4000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't really need 4000 rows for this test (theoretically we only need 1) but probably wouldn't speed this test up if there where less rows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants