-
Notifications
You must be signed in to change notification settings - Fork 183
fix(arrow): iceberg writer improvements #713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
| v := val.(int32) | ||
| return fmt.Sprintf("%d", v), v, nil | ||
| case "long": | ||
| v := val.(int64) | ||
| return fmt.Sprintf("%d", v), v, nil | ||
| case "float": | ||
| v := val.(float32) | ||
| return fmt.Sprintf("%g", v), v, nil | ||
| case "double": | ||
| v := val.(float64) | ||
| return fmt.Sprintf("%g", v), v, nil | ||
| case "string": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to typecast and then convert? diff between prev and current aproach ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously we used to type case it towards the java side if you remember in a java function if you remember that's not good
now we do it towards the exact go side in transforms logic and send over proto in its exact type in the java server
| } | ||
|
|
||
| w.writers.Delete(fileType + ":" + partitionKey) | ||
| delete(w.writers, fileType+":"+partitionKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for ":" let us create a function to get fileKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| var pValues []any | ||
| if len(w.partitionInfo) != 0 { | ||
| values, err := w.getRecordPartitionValues(rec, olakeTimestamp) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us discuss the reason for adding it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from transforms logic, we need two types of transform values:
(a) for generating the partition key for file path, e.g., 2009-11 (kind of human readable string format), and,
(b) partition value to add in the iceberg table, e.g., 478 which goes to the manifests
so, technically, this code snippet helps it to extract the partition values only once per partition
| if closeErr := writer.currentWriter.Close(); closeErr != nil { | ||
| err = fmt.Errorf("failed to close writer: %s", closeErr) | ||
| return false | ||
| for mapKey, writer := range w.writers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for mapKey, writer := range w.writers { | |
| for pKey, writer := range w.writers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| pv.Value = &proto.ArrowPayload_FileMetadata_PartitionValue_StringValue{StringValue: v} | ||
| case bool: | ||
| // Booleans stored as string "true"/"false" per Iceberg convention | ||
| pv.Value = &proto.ArrowPayload_FileMetadata_PartitionValue_StringValue{StringValue: fmt.Sprintf("%t", v)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about timestamp as partition value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not required.
|
|
||
| public void accumulateDeleteFiles(String threadId, Table table, String filePath, int equalityFieldId, | ||
| long recordCount, List<String> partitionValues) { | ||
| if (table == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, I couldn't personally think of a case where we will receive the table to null. As this is called during register and commit proto case, and we always either load or create the table before moving with any iceberg writer
| case LONG_VALUE -> protoValue.getLongValue(); | ||
| case FLOAT_VALUE -> protoValue.getFloatValue(); | ||
| case DOUBLE_VALUE -> protoValue.getDoubleValue(); | ||
| case STRING_VALUE -> protoValue.getStringValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp not available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to consider the timestamp value as we are adding these as partition values to iceberg and timestamp value gets converted into its required int type based on iceberg transformations like day, hour, etc.
utils/testutils/test_utils.go
Outdated
| for _, useArrowWriter := range []bool{false, true} { | ||
| writerType := utils.Ternary(useArrowWriter, "Arrow", "Legacy").(string) | ||
| t.Run(fmt.Sprintf("Iceberg (%s) Full load + CDC tests", writerType), func(t *testing.T) { | ||
| if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, useArrowWriter, cfg.testIcebergFullLoadAndCDC); err != nil { | ||
| t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", writerType, err) | ||
| } | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks little weired can you check other way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| return types.Int32 | ||
| case reflect.Int64, reflect.Uint64: | ||
| // on standard 64 bit systems, golang's int type is 64 bits | ||
| case reflect.Int, reflect.Int64, reflect.Uint, reflect.Uint64: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how we checked this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, according to the test script which produced this error we used to add 99991199 as a value.
a := 99991199 // var a int
r2 := types.CreateRawRecord("2", map[string]any{"name": "rohan", "age": 1, "contact": 99991199, "email": nil}, "c", nil)now, an int type can be both int32 and int64 in go, but in certain systems having it in int64 makes it faster. Check .
The int, uint, and uintptr types are implementation-specific. On 64-bit systems, int is 64 bits.we didn't face this issue on legacy writer as the java engine there itself used to create parquet files, here we first create arrow memory vector buffers (basically schema) and then write the value. Thus, receiving this error:
org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.allocateVectorBasedOnOriginalType(VectorizedArrowReader.java:279)
Signed-off-by: badalprasadsingh <badal@datazip.io>
Signed-off-by: badalprasadsingh <badal@datazip.io>
|
Currently in |
Description
Fixes:
now()in partition transformationsinttolongcaseType of change
How Has This Been Tested?
Screenshots or Recordings
Documentation