-
Notifications
You must be signed in to change notification settings - Fork 183
feat: DB2 LUW (Linux, Unix, Windows) as source connector #694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
| TRIM(TABSCHEMA) AS table_schema, | ||
| TRIM(TABNAME) AS table_name | ||
| FROM SYSCAT.TABLES | ||
| WHERE TYPE IN ('T', 'V') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we select View also ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will ask product and do changes accordingly.
| err := d.client.QueryRowContext(ctx, existsQuery).Scan(&hasRows) | ||
|
|
||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to check if table has rows: %s", err) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| err := d.client.QueryRowContext(ctx, existsQuery).Scan(&hasRows) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to check if table has rows: %s", err) | |
| } | |
| err := d.client.QueryRowContext(ctx, existsQuery).Scan(&hasRows) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to check if table has rows: %s", err) | |
| } |
| return chunks, nil | ||
| } | ||
| // split chunks via physical identifier RID() | ||
| splitViaRID := func(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to use RID for chunking ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for non-primary key, I think we should use it because in table with no primary keys there is less chance of any column to be indexed. so for those kind of tables, it is better to use RID.
| switch v := cursorValue.(type) { | ||
| case time.Time: | ||
| if a.driver.Type() == string(constants.DB2) { | ||
| return v.Format("2006-01-02 15:04:05.000000") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't there a timestamp aware format for db2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the format we require for DB2 timestamp to be saved in state.
Please check this also |
THis also |
Database: |
Signed-off-by: Duke <duke@datazip.io>
| fi | ||
| ;; | ||
| "Linux") | ||
| download_url="https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/linuxx64_odbc_cli.tar.gz" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for linux arm64 don't exist. Is that why we skip the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
build.sh
Outdated
| # Clean up any partial downloads from the failed go installer | ||
| rm -rf "$install_dir/clidriver" 2>/dev/null | ||
| rm -f "$install_dir"/*.tar.gz 2>/dev/null | ||
| rm -f "$install_dir"/*.zip 2>/dev/null | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just do curl. Keep it simple silly!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
vaibhav-datazip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested for both full-refresh and incremental mode on OLake-CLI
- using only primary cursor
- using fallback cursor aswell
- using float , int, string , timestamp as cursor values
- filter using string, timestamp , int .
drivers/db2/internal/backfill.go
Outdated
| } | ||
|
|
||
| if hasRows { | ||
| return nil, fmt.Errorf("stats not populated for table[%s]. Please run command:\tRUNSTATS ON TABLE %s.%s WITH DISTRIBUTION AND DETAILED INDEXES ALL;\t to update table statistics", stream.ID(), stream.Namespace(), stream.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of writing Please run command, you can mention Please run CLP command:
|
|
||
| func (d *DB2) splitTableIntoChunks(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| // split chunks via primary key | ||
| splitViaPrimaryKey := func(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried syncing following table with 3 records
CREATE TABLE ALL_DB2_TYPES (
COL_SMALLINT SMALLINT,
COL_INTEGER INTEGER,
COL_BIGINT BIGINT,
COL_DECIMAL DECIMAL(10,2),
COL_NUMERIC NUMERIC(8,4),
COL_REAL REAL,
COL_DOUBLE DOUBLE,
COL_DECFLOAT16 DECFLOAT(16),
COL_DECFLOAT34 DECFLOAT(34),
COL_CHAR10 CHAR(10),
COL_VARCHAR50 VARCHAR(50),
COL_VARGRAPHIC50 VARGRAPHIC(50),
COL_GRAPHIC10 GRAPHIC(10),
COL_LONGVARCHAR LONG VARCHAR,
COL_LONGVARGRAPHIC LONG VARGRAPHIC,
COL_CHAR_BIT CHAR(10) FOR BIT DATA,
COL_VARCHAR_BIT VARCHAR(20) FOR BIT DATA,
COL_VARBINARY VARBINARY(50),
COL_DATE DATE,
COL_TIME TIME,
COL_TIMESTAMP TIMESTAMP,
COL_BOOLEAN BOOLEAN,
COL_CLOB CLOB(1M),
COL_DBCLOB DBCLOB(500K),
COL_BLOB BLOB(500K),
COL_XML XML
);
I got the following error which syncing
2026-01-09T09:06:38Z DEBUG Starting backfill for DB2INST1.ALL_DB2_TYPES with chunk {6 7} using query: SELECT * FROM "DB2INST1"."ALL_DB2_TYPES" WHERE RID("DB2INST1"."ALL_DB2_TYPES") >= 6 AND RID("DB2INST1"."ALL_DB2_TYPES") < 7
2026-01-09T09:06:38Z INFO Sync completed, wait 5 seconds cleanup in progress...
2026-01-09T09:06:43Z FATAL error occurred while reading records: error occurred while waiting for connections: thread[DB2INST1.ALL_DB2_TYPES_01KEH03WW4FHZKVDDVHQFEKQ43]: failed to insert chunk min[%!s(int64=4)] and max[%!s(int64=6)] of stream DB2INST1.ALL_DB2_TYPES, insert func error: %!s(<nil>), thread error: failed to flush data while closing: failed to write records: failed to send batch: rpc error: code = Internal desc = grpc: error while marshaling: string field contains invalid UTF-8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data in database is invalid utf-8 vlues
| } | ||
|
|
||
| func (d *DB2) splitTableIntoChunks(ctx context.Context, stream types.StreamInterface) (*types.Set[types.Chunk], error) { | ||
| // split chunks via primary key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with 0 records I tried syncing the following table
CREATE TABLE ALL_DB2_TYPES (
COL_SMALLINT SMALLINT,
COL_INTEGER INTEGER,
COL_BIGINT BIGINT,
COL_DECIMAL DECIMAL(10,2),
COL_NUMERIC NUMERIC(8,4),
COL_REAL REAL,
COL_DOUBLE DOUBLE,
COL_DECFLOAT16 DECFLOAT(16),
COL_DECFLOAT34 DECFLOAT(34),
COL_CHAR10 CHAR(10),
COL_VARCHAR50 VARCHAR(50),
COL_VARGRAPHIC50 VARGRAPHIC(50),
COL_GRAPHIC10 GRAPHIC(10),
COL_LONGVARCHAR LONG VARCHAR,
COL_LONGVARGRAPHIC LONG VARGRAPHIC,
COL_CHAR_BIT CHAR(10) FOR BIT DATA,
COL_VARCHAR_BIT VARCHAR(20) FOR BIT DATA,
COL_VARBINARY VARBINARY(50),
COL_DATE DATE,
COL_TIME TIME,
COL_TIMESTAMP TIMESTAMP,
COL_BOOLEAN BOOLEAN,
COL_CLOB CLOB(1M),
COL_DBCLOB DBCLOB(500K),
COL_BLOB BLOB(500K),
COL_XML XML
);
got this error
2026-01-09T08:56:32Z INFO Sync completed, wait 5 seconds cleanup in progress...
2026-01-09T08:56:37Z FATAL error occurred while reading records: error occurred while waiting for context groups: failed to get or split chunks: failed to get the min and max rid: sql: Scan error on column index 0, name "1": converting NULL to int64 is unsupported
| logger.Debugf("Starting backfill for %s with chunk %v using query: %s", stream.ID(), chunk, stmt) | ||
|
|
||
| reader := jdbc.NewReader(ctx, stmt, func(ctx context.Context, query string, queryArgs ...any) (*sql.Rows, error) { | ||
| return d.client.QueryContext(ctx, query, args...) | ||
| }) | ||
|
|
||
| return reader.Capture(func(rows *sql.Rows) error { | ||
| record := make(types.Record) | ||
| if err := jdbc.MapScan(rows, record, d.dataTypeConverter); err != nil { | ||
| return fmt.Errorf("failed to scan record data as map: %s", err) | ||
| } | ||
| return OnMessage(ctx, record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what isolation mode are we using here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read committed or cursor stability as it is called by DB2
| "dbclob": types.String, | ||
|
|
||
| // date / time | ||
| "time": types.String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in iceberg it is coming in utc format, 10 -> 4:30 (- 5:30)
| "decfloat": types.Float64, | ||
|
|
||
| // boolean | ||
| "boolean": types.Bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is not boolean type in in db2, found this while testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| if hasRows { | ||
| return nil, fmt.Errorf("stats not populated for table[%s]. Please run CLP command:\tRUNSTATS ON TABLE %s.%s WITH DISTRIBUTION AND DETAILED INDEXES ALL;\t to update table statistics", stream.ID(), stream.Namespace(), stream.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for LOB (CLOB, DBCLOB, BLOB) and XML columns, don't support distribution statistics. Running RUNSTATS will give error while trying to run the given command in database
SQL2310N The utility could not generate statistics. Error "-668" was returned.
please mention about this in the doc as well
| "real": types.Float32, | ||
| "float": types.Float64, | ||
| "numeric": types.Float64, | ||
| "double": types.Float64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| reader := jdbc.NewReader(ctx, stmt, func(ctx context.Context, query string, queryArgs ...any) (*sql.Rows, error) { | ||
| return d.client.QueryContext(ctx, query, args...) | ||
| }) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.


















Description
DB2 LUW as source connector is added. As of now it supports 2 sync modes:
Chunking is either done via primary keys (if primary keys are present). Else RID based chunking is done (for tables with no primary keys).
Pre-Requisite
To run DB2 LUW, one needs IBM data server ODBC and CLI driver to be installed in the machine.
Steps to run DB2 LUW in your machine:
Download the IBM data server ODBC and CLI driver as per OS.
Now extract the downloadable content from the above URLs, a folder named
clidriverwill be created in your directory.Now set the environment variables in pc terminal, necessary to run DB2 LUW.
In this PR, the base alpine image has been changed to
debian:bookworm-slimfor better db2 , other database drivers support.Type of change
How Has This Been Tested?
This has been tested on DB2 LUW VM Instance. Full refresh and incremental were tested in this.
Documentation
Related PR's (If Any):