Skip to content

Commit 13ea865

Browse files
authored
PHOENIX-7407 Remove deprecated datasource V1 code from spark2 and spark3 connector (#145)
1 parent bc3e25a commit 13ea865

40 files changed

+895
-2971
lines changed

phoenix5-spark/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,9 @@ the deprected `zkUrl` parameter for backwards compatibility purposes. If neither
331331
it falls back to using connection defined by hbase-site.xml.
332332
- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport",
333333
while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
334-
- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
335-
instead of `"phoenix"`, however this is deprecated.
336-
The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter,
337-
only `"zkUrl"`
334+
- DataSourceV1 implementation was removed,
335+
source type `"org.apache.phoenix.spark"`
336+
use the DatasourceV2 since connector 6.0.0 release.
338337
- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
339338
`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow
340339
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,

phoenix5-spark/pom.xml

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,6 @@
8484
<version>${spark.version}</version>
8585
<scope>provided</scope>
8686
</dependency>
87-
<dependency>
88-
<groupId>org.apache.spark</groupId>
89-
<artifactId>spark-tags_${scala.binary.version}</artifactId>
90-
<version>${spark.version}</version>
91-
<scope>provided</scope>
92-
</dependency>
9387
<dependency>
9488
<groupId>org.apache.spark</groupId>
9589
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
@@ -177,39 +171,6 @@
177171
</exclusions>
178172
</dependency>
179173

180-
<dependency>
181-
<groupId>org.apache.hadoop</groupId>
182-
<artifactId>hadoop-mapreduce-client-core</artifactId>
183-
<scope>provided</scope>
184-
<exclusions>
185-
<exclusion>
186-
<groupId>log4j</groupId>
187-
<artifactId>log4j</artifactId>
188-
</exclusion>
189-
<exclusion>
190-
<groupId>javax.servlet</groupId>
191-
<artifactId>servlet-api</artifactId>
192-
</exclusion>
193-
<exclusion>
194-
<groupId>javax.servlet.jsp</groupId>
195-
<artifactId>jsp-api</artifactId>
196-
</exclusion>
197-
<exclusion>
198-
<groupId>org.jruby</groupId>
199-
<artifactId>jruby-complete</artifactId>
200-
</exclusion>
201-
<exclusion>
202-
<groupId>org.jboss.netty</groupId>
203-
<artifactId>netty</artifactId>
204-
</exclusion>
205-
<exclusion>
206-
<groupId>io.netty</groupId>
207-
<artifactId>netty</artifactId>
208-
</exclusion>
209-
</exclusions>
210-
</dependency>
211-
212-
213174
<dependency>
214175
<groupId>org.apache.hbase</groupId>
215176
<artifactId>hbase-client</artifactId>
@@ -371,11 +332,6 @@
371332
<artifactId>slf4j-api</artifactId>
372333
<scope>provided</scope>
373334
</dependency>
374-
<dependency>
375-
<groupId>joda-time</groupId>
376-
<artifactId>joda-time</artifactId>
377-
<version>${jodatime.version}</version>
378-
</dependency>
379335

380336
<!-- Test dependencies -->
381337
<dependency>

phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java

Lines changed: 90 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,8 @@ public Configuration getConfiguration(Configuration confToClone) {
7373

7474
@Test
7575
public void basicWriteAndReadBackTest() throws SQLException {
76-
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
77-
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
78-
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
79-
SQLContext sqlContext = new SQLContext(jsc);
76+
77+
SparkSession spark = SparkUtil.getSparkSession();
8078
String tableName = generateUniqueName();
8179

8280
try (Connection conn = DriverManager.getConnection(getUrl());
@@ -85,141 +83,122 @@ public void basicWriteAndReadBackTest() throws SQLException {
8583
"CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)");
8684
}
8785

88-
try (SparkSession spark = sqlContext.sparkSession()) {
86+
StructType schema =
87+
new StructType(new StructField[] {
88+
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
89+
new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });
8990

90-
StructType schema =
91-
new StructType(new StructField[] {
92-
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
93-
new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });
91+
// Use old zkUrl
92+
Dataset<Row> df1 =
93+
spark.createDataFrame(
94+
Arrays.asList(RowFactory.create(1, "x")),
95+
schema);
9496

95-
// Use old zkUrl
96-
Dataset<Row> df1 =
97-
spark.createDataFrame(
98-
Arrays.asList(RowFactory.create(1, "x")),
99-
schema);
97+
df1.write().format("phoenix").mode(SaveMode.Overwrite)
98+
.option("table", tableName)
99+
.option(ZOOKEEPER_URL, getUrl())
100+
.save();
101+
102+
// Use jdbcUrl
103+
// In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
104+
// quorum
105+
String jdbcUrl = getUrl();
106+
if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
107+
jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
108+
}
109+
Dataset<Row> df2 =
110+
spark.createDataFrame(
111+
Arrays.asList(RowFactory.create(2, "x")),
112+
schema);
100113

101-
df1.write().format("phoenix").mode(SaveMode.Overwrite)
114+
df2.write().format("phoenix").mode(SaveMode.Overwrite)
102115
.option("table", tableName)
103-
.option(ZOOKEEPER_URL, getUrl())
116+
.option(JDBC_URL, jdbcUrl)
104117
.save();
105118

106-
// Use jdbcUrl
107-
// In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
108-
// quorum
109-
String jdbcUrl = getUrl();
110-
if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
111-
jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
112-
}
113-
Dataset<Row> df2 =
114-
spark.createDataFrame(
115-
Arrays.asList(RowFactory.create(2, "x")),
116-
schema);
119+
// Use default from hbase-site.xml
120+
Dataset<Row> df3 =
121+
spark.createDataFrame(
122+
Arrays.asList(RowFactory.create(3, "x")),
123+
schema);
117124

118-
df2.write().format("phoenix").mode(SaveMode.Overwrite)
119-
.option("table", tableName)
120-
.option(JDBC_URL, jdbcUrl)
121-
.save();
125+
df3.write().format("phoenix").mode(SaveMode.Overwrite)
126+
.option("table", tableName)
127+
.save();
122128

123-
// Use default from hbase-site.xml
124-
Dataset<Row> df3 =
125-
spark.createDataFrame(
126-
Arrays.asList(RowFactory.create(3, "x")),
127-
schema);
129+
try (Connection conn = DriverManager.getConnection(getUrl());
130+
Statement stmt = conn.createStatement()) {
131+
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
132+
assertTrue(rs.next());
133+
assertEquals(1, rs.getInt(1));
134+
assertEquals("x", rs.getString(2));
135+
assertTrue(rs.next());
136+
assertEquals(2, rs.getInt(1));
137+
assertEquals("x", rs.getString(2));
138+
assertTrue(rs.next());
139+
assertEquals(3, rs.getInt(1));
140+
assertEquals("x", rs.getString(2));
141+
assertFalse(rs.next());
142+
}
128143

129-
df3.write().format("phoenix").mode(SaveMode.Overwrite)
144+
Dataset df1Read = spark.read().format("phoenix")
130145
.option("table", tableName)
131-
.save();
132-
133-
try (Connection conn = DriverManager.getConnection(getUrl());
134-
Statement stmt = conn.createStatement()) {
135-
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
136-
assertTrue(rs.next());
137-
assertEquals(1, rs.getInt(1));
138-
assertEquals("x", rs.getString(2));
139-
assertTrue(rs.next());
140-
assertEquals(2, rs.getInt(1));
141-
assertEquals("x", rs.getString(2));
142-
assertTrue(rs.next());
143-
assertEquals(3, rs.getInt(1));
144-
assertEquals("x", rs.getString(2));
145-
assertFalse(rs.next());
146-
}
147-
148-
Dataset df1Read = spark.read().format("phoenix")
149-
.option("table", tableName)
150-
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();
146+
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();
151147

152-
assertEquals(3l, df1Read.count());
148+
assertEquals(3l, df1Read.count());
153149

154-
// Use jdbcUrl
155-
Dataset df2Read = spark.read().format("phoenix")
156-
.option("table", tableName)
157-
.option(PhoenixDataSource.JDBC_URL, jdbcUrl)
158-
.load();
159-
160-
assertEquals(3l, df2Read.count());
150+
// Use jdbcUrl
151+
Dataset df2Read = spark.read().format("phoenix")
152+
.option("table", tableName)
153+
.option(PhoenixDataSource.JDBC_URL, jdbcUrl)
154+
.load();
161155

162-
// Use default
163-
Dataset df3Read = spark.read().format("phoenix")
164-
.option("table", tableName)
165-
.load();
156+
assertEquals(3l, df2Read.count());
166157

167-
assertEquals(3l, df3Read.count());
158+
// Use default
159+
Dataset df3Read = spark.read().format("phoenix")
160+
.option("table", tableName)
161+
.load();
168162

169-
} finally {
170-
jsc.stop();
171-
}
163+
assertEquals(3l, df3Read.count());
172164
}
173165

174166
@Test
175167
public void lowerCaseWriteTest() throws SQLException {
176-
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
177-
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
178-
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
179-
SQLContext sqlContext = new SQLContext(jsc);
168+
SparkSession spark = SparkUtil.getSparkSession();
180169
String tableName = generateUniqueName();
181170

182171
try (Connection conn = DriverManager.getConnection(getUrl());
183172
Statement stmt = conn.createStatement()){
184173
stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR, \"v1\" VARCHAR)");
185174
}
175+
StructType schema = new StructType(new StructField[]{
176+
new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()),
177+
new StructField("V1", DataTypes.StringType, false, Metadata.empty()),
178+
new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty())
179+
});
186180

187-
try(SparkSession spark = sqlContext.sparkSession()) {
188-
//Doesn't help
189-
spark.conf().set("spark.sql.caseSensitive", true);
190-
191-
StructType schema = new StructType(new StructField[]{
192-
new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()),
193-
new StructField("V1", DataTypes.StringType, false, Metadata.empty()),
194-
new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty())
195-
});
196-
197-
Dataset<Row> df = spark.createDataFrame(
198-
Arrays.asList(
199-
RowFactory.create(1, "x", "y")),
200-
schema);
201-
202-
df.write()
203-
.format("phoenix")
204-
.mode(SaveMode.Overwrite)
205-
.option("table", tableName)
206-
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
207-
.option(JDBC_URL, getUrl())
208-
.save();
209-
210-
try (Connection conn = DriverManager.getConnection(getUrl());
211-
Statement stmt = conn.createStatement()) {
212-
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
213-
assertTrue(rs.next());
214-
assertEquals(1, rs.getInt(1));
215-
assertEquals("x", rs.getString(2));
216-
assertEquals("y", rs.getString(3));
217-
assertFalse(rs.next());
218-
}
181+
Dataset<Row> df = spark.createDataFrame(
182+
Arrays.asList(
183+
RowFactory.create(1, "x", "y")),
184+
schema);
219185

186+
df.write()
187+
.format("phoenix")
188+
.mode(SaveMode.Overwrite)
189+
.option("table", tableName)
190+
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
191+
.option(JDBC_URL, getUrl())
192+
.save();
220193

221-
} finally {
222-
jsc.stop();
194+
try (Connection conn = DriverManager.getConnection(getUrl());
195+
Statement stmt = conn.createStatement()) {
196+
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
197+
assertTrue(rs.next());
198+
assertEquals(1, rs.getInt(1));
199+
assertEquals("x", rs.getString(2));
200+
assertEquals("y", rs.getString(3));
201+
assertFalse(rs.next());
223202
}
224203
}
225204

phoenix5-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
6767

6868
lazy val jdbcUrl = PhoenixSparkITHelper.getUrl
6969

70-
lazy val quorumAddress = ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
70+
lazy val quorumAddress = PhoenixSparkITHelper.getUrl
7171

7272
// Runs SQL commands located in the file defined in the sqlSource argument
7373
// Optional argument tenantId used for running tenant-specific SQL

0 commit comments

Comments
 (0)