A library to read/write DataFrames and Streaming DataFrames to/from Apache Hive™ using LLAP. With Apache Ranger™, this library provides row/column level fine-grained access controls.
This project is an independent fork and is not affiliated with, sponsored by,
or endorsed by Hortonworks, Cloudera, or the Apache Software Foundation.
For customer compatibility, existing package names and group IDs that include
com.hortonworks are retained, but they do not imply any official relationship.
This fork targets ODP 1.3.1.0 and aligns with the Spark/Hive versions shipped in that release. For configuration of prior versions, please see prior documentation.
| branch | Spark | Hive | ODP |
|---|---|---|---|
| main (January 2026) | 3.5.6 | 4.0.1 | 1.3.X (TP) |
- Legacy code
| branch | Spark | Hive | HDP |
|---|---|---|---|
| master (Summer 2018) | 2.3.1 | 3.1.0 | 3.0.0 (GA) |
| branch-2.3 | 2.3.0 | 2.1.0 | 2.6.x (TP) |
| branch-2.2 | 2.2.0 | 2.1.0 | 2.6.x (TP) |
| branch-2.1 | 2.1.1 | 2.1.0 | 2.6.x (TP) |
| branch-1.6 | 1.6.3 | 2.1.0 | 2.5.x (TP) |
Ensure the following Spark properties are set via spark-defaults.conf or
using --conf or through other Spark configuration.
| Property | Description | Example |
|---|---|---|
| spark.sql.hive.hiveserver2.jdbc.url | ThriftJDBC URL for LLAP HiveServer2 | jdbc:hive2://localhost:10000 |
| spark.datasource.hive.warehouse.load.staging.dir | Temp directory for batch writes to Hive | /tmp |
| spark.hadoop.hive.llap.daemon.service.hosts | App name for LLAP service | @llap0 |
| spark.hadoop.hive.zookeeper.quorum | Zookeeper hosts used by LLAP | host1:2181;host2:2181;host3:2181 |
For use in Spark client-mode on kerberized Yarn cluster, set:
| Property | Description | Example |
|---|---|---|
| spark.sql.hive.hiveserver2.jdbc.url.principal | Set equal to hive.server2.authentication.kerberos.principal | hive/_HOST@EXAMPLE.COM |
For use in Spark cluster-mode on kerberized Yarn cluster, set:
| Property | Description | Example |
|---|---|---|
| spark.security.credentials.hiveserver2.enabled | Use Spark ServiceCredentialProvider | true |
| Spark Type | Hive Type |
|---|---|
| ByteType | TinyInt |
| ShortType | SmallInt |
| IntegerType | Integer |
| LongType | BigInt |
| FloatType | Float |
| DoubleType | Double |
| DecimalType | Decimal |
| StringType* | String, Char, Varchar* |
| BinaryType | Binary |
| BooleanType | Boolean |
| TimestampType* | Timestamp* |
| DateType | Date |
| ArrayType | Array |
| StructType | Struct |
- A Hive String, Char, Varchar column will be converted into a Spark StringType column.
- When a Spark StringType column has maxLength metadata, it will be converted into a Hive Varchar column. Otherwise, it will be converted into a Hive String column.
- A Hive Timestamp column will lose sub-microsecond precision when it is converted into a Spark TimestampType column. Because a Spark TimestampType column is microsecond precision, while a Hive Timestamp column is nanosecond precision.
| Spark Type | Hive Type | Plan |
|---|---|---|
| CalendarIntervalType | Interval | Planned for future support |
| MapType | Map | Planned for future support |
| N/A | Union | Not supported in Spark |
| NullType | N/A | Not supported in Hive |
Support is currently available for spark-shell, pyspark, and spark-submit.
- Locate the
hive-warehouse-connector-assemblyjar. If building from source, this will be located within thetarget/scala-2.12folder.
For now the ODP distro does not contain HWC by default, it will in later releases.
- Use
--jarsto add the connector jar to app submission, e.g.
spark-shell --jars /usr/odp/current/hive-warehouse-connector/hive-warehouse-connector-assembly-1.3.1.jar
- Follow the instructions above to add the connector jar to app submission.
- Additionally add the connector's Python package to app submission, e.g.
pyspark --jars /usr/odp/current/hive-warehouse-connector/hive-warehouse-connector-assembly-1.3.1.jar --py-files /usr/odp/current/hive-warehouse-connector/pyspark_hwc-1.3.1.zip
HiveWarehouseSession acts as an API to bridge Spark with HiveServer2.
In your Spark source, create an instance of HiveWarehouseSession using HiveWarehouseBuilder
- Create HiveWarehouseSession (assuming
sparkis an existingSparkSession):
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
- Set the current database for unqualified Hive table references:
hive.setDatabase(<database>)
- Execute catalog operation and return DataFrame, e.g.
hive.execute("describe extended web_sales").show(100, false)
- Show databases:
hive.showDatabases().show(100, false)
- Show tables for current database:
hive.showTables().show(100, false)
- Describe table:
hive.describeTable(<table_name>).show(100, false)
- Create a database:
hive.createDatabase(<database_name>)
- Create ORC table, e.g.:
hive.createTable("web_sales") .ifNotExists() .column("sold_time_sk", "bigint") .column("ws_ship_date_sk", "bigint") .create()
- Drop a database:
hive.dropDatabase(<databaseName>, <ifExists>, <useCascade>)
- Drop a table:
hive.dropTable(<tableName>, <ifExists>, <usePurge>)
- Execute Hive SELECT query and return DataFrame, e.g.
val df = hive.executeQuery("select * from web_sales")
- Reference a Hive table as a DataFrame
val df = hive.table(<tableName>)
- Execute Hive update statement, e.g.
hive.executeUpdate("ALTER TABLE old_name RENAME TO new_name")
- Write a DataFrame to Hive in batch (uses LOAD DATA INTO TABLE), e.g.
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("database", <databaseName>) .option("table", <tableName>) .save()
- Write a DataFrame to Hive using HiveStreaming, e.g.
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
.option("database", <databaseName>)
.option("table", <tableName>)
.option("metastoreUri", <HMS_URI>)
.save()
// To write to static partition
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
.option("database", <databaseName>)
.option("table", <tableName>)
.option("partition", <partition>)
.option("metastoreUri", <HMS URI>)
.save()
- Write a Spark Stream to Hive using HiveStreaming, e.g.
stream.writeStream
.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
.option("metastoreUri", metastoreUri)
.option("database", "streaming")
.option("table", "web_sales")
.option("checkpointLocation", "hdfs://<nameservice>/tmp/hwc_ckpt")
.start()
interface HiveWarehouseSession {
//Execute Hive SELECT query and return DataFrame (SQL route)
Dataset<Row> sql(String sql);
//Execute Hive SELECT query and return DataFrame
Dataset<Row> executeQuery(String sql);
//Execute Hive SELECT query with execution mode
Dataset<Row> executeQuery(String sql, boolean isSparkExecution);
//Execute Hive SELECT query with split count
Dataset<Row> executeQuery(String sql, int splitCount);
//Execute Hive update statement
boolean executeUpdate(String sql);
//Execute Hive update statement (legacy)
@Deprecated
boolean executeUpdate(String sql, boolean propagateException);
//Execute Hive catalog-browsing operation and return DataFrame
Dataset<Row> execute(String sql);
//Reference a Hive table as a DataFrame
Dataset<Row> table(String sql);
//Return the SparkSession attached to this HiveWarehouseSession
SparkSession session();
//Set the current database for unqualified Hive table references
void setDatabase(String name);
/**
* Helpers: wrapper functions over execute or executeUpdate
*/
//Helper for show databases
Dataset<Row> showDatabases();
//Helper for show tables
Dataset<Row> showTables();
//Helper for describeTable
Dataset<Row> describeTable(String table);
//Helper for create database
void createDatabase(String database, boolean ifNotExists);
//Helper for create table stored as ORC
CreateTableBuilder createTable(String tableName);
//Helper for merge statements
MergeBuilder mergeBuilder();
//Helper for drop database
void dropDatabase(String database, boolean ifExists, boolean cascade);
//Helper for drop table
void dropTable(String table, boolean ifExists, boolean purge);
//Cleanup streaming metadata for a checkpoint location
void cleanUpStreamingMeta(String queryCheckpointDir, String database, String table);
//Close resources
void close();
//Backwards-compatible alias for sql
Dataset<Row> q(String sql);
}
Read table data from Hive, transform in Spark, write to new Hive table
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
hive.setDatabase("tpcds_bin_partitioned_orc_1000")
val df = hive.executeQuery("select * from web_sales")
hive.setDatabase("spark_llap")
val tempTable = "t_" + System.currentTimeMillis()
hive.createTable(tempTable).ifNotExists().column("ws_sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create()
df.select("ws_sold_time_sk", "ws_ship_date_sk").filter("ws_sold_time_sk > 80000").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").option("table", tempTable).save()
val df2 = hive.executeQuery("select * from " + tempTable)
df2.show(20)
hive.dropTable(tempTable, true, false)