Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion deployment/terraform-ansible/deploy-pulsar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@
# - rabbitmq
# - redis
# - solr
# - twitter
- name: Set up broker
template:
src: "../templates/broker.conf"
Expand Down
1 change: 0 additions & 1 deletion distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
-->

<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
Expand Down
5 changes: 0 additions & 5 deletions pulsar-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -580,11 +580,6 @@
<artifactId>pulsar-io-solr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io</artifactId>
Expand Down
25 changes: 21 additions & 4 deletions pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,15 @@

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<artifactId>pulsar-io-data-generator</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-netty</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand Down Expand Up @@ -245,12 +253,21 @@
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<artifactId>pulsar-io-data-generator</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-data-generator.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-netty</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}</outputDirectory>
<destFileName>pulsar-io-twitter.nar</destFileName>
<destFileName>pulsar-io-netty.nar</destFileName>
</artifactItem>
<artifactItem>
<groupId>${project.groupId}</groupId>
Expand Down Expand Up @@ -282,10 +299,10 @@
<configuration>
<systemPropertyVariables>
<pulsar-io-data-generator.nar.path>${project.build.directory}/pulsar-io-data-generator.nar</pulsar-io-data-generator.nar.path>
<pulsar-io-netty.nar.path>${project.build.directory}/pulsar-io-netty.nar</pulsar-io-netty.nar.path>
<pulsar-functions-api-examples.jar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-functions-api-examples.jar.path>
<pulsar-functions-api-examples.nar.path>${project.build.directory}/pulsar-functions-api-examples.nar</pulsar-functions-api-examples.nar.path>
<pulsar-io-cassandra.nar.path>${project.build.directory}/pulsar-io-cassandra.nar</pulsar-io-cassandra.nar.path>
<pulsar-io-twitter.nar.path>${project.build.directory}/pulsar-io-twitter.nar</pulsar-io-twitter.nar.path>
<!-- valid jar file that is not a valid nar file -->
<pulsar-io-invalid.nar.path>${project.build.directory}/pulsar-functions-api-examples.jar</pulsar-io-invalid.nar.path>
</systemPropertyVariables>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public abstract class AbstractFunctionsResourceTest {
protected static final String CASSANDRA_STRING_SINK = "org.apache.pulsar.io.cassandra.CassandraStringSink";
protected static final int PARALLELISM = 1;
private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path";
private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH = "pulsar-io-twitter.nar.path";
private static final String SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path";
private static final String SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH = "pulsar-io-netty.nar.path";
private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH = "pulsar-io-invalid.nar.path";
private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
"pulsar-functions-api-examples.nar.path";
Expand Down Expand Up @@ -123,10 +124,16 @@ public static File getPulsarIOCassandraNar() {
+ SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + " system property"));
}

public static File getPulsarIOTwitterNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH)
, "pulsar-io-twitter.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + " system property"));
public static File getPulsarIODataGenNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH)
, "pulsar-io-data-generator.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH + " system property"));
}

public static File getPulsarIONettyNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH)
, "pulsar-io-netty.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH + " system property"));
}

public static File getPulsarIOInvalidNar() {
Expand Down Expand Up @@ -211,7 +218,7 @@ protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) {
}

protected File getDefaultNarFile() {
return getPulsarIOTwitterNar();
return getPulsarIODataGenNar();
}

protected void doSetup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void testRegisterSinkMissingPackageDetails() {
public void testRegisterSinkInvalidJarNoSink() throws IOException {
mockInstanceUtils();
try {
try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (FileInputStream inputStream = new FileInputStream(getPulsarIONettyNar())) {
testRegisterSinkMissingArguments(
TENANT,
NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class SourceApiV3ResourceTest extends AbstractFunctionsResourceTest {
private static final String source = "test-source";
private static final String outputTopic = "test-output-topic";
private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
private static final String TWITTER_FIRE_HOSE = "org.apache.pulsar.io.twitter.TwitterFireHose";
private static final String DATAGEN_SOURCE = "org.apache.pulsar.io.datagenerator.DataGeneratorSource";
private SourcesImpl resource;

@Override
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testRegisterSourceMissingTenant() {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null
);
Expand All @@ -130,7 +130,7 @@ public void testRegisterSourceMissingNamespace() {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null
);
Expand All @@ -151,7 +151,7 @@ public void testRegisterSourceMissingSourceName() {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null
);
Expand Down Expand Up @@ -206,7 +206,7 @@ public void testRegisterSourceMissingPackage() {

@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
public void testRegisterSourceMissingPackageDetails() throws IOException {
try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
testRegisterSourceMissingArguments(
TENANT,
NAMESPACE,
Expand All @@ -215,7 +215,7 @@ public void testRegisterSourceMissingPackageDetails() throws IOException {
null,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null
);
Expand Down Expand Up @@ -271,7 +271,7 @@ public void testRegisterSourceInvalidJarWithNoSource() throws IOException {

@Test
public void testRegisterSourceNoOutputTopic() throws IOException {
try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
testRegisterSourceMissingArguments(
TENANT,
NAMESPACE,
Expand All @@ -280,7 +280,7 @@ public void testRegisterSourceNoOutputTopic() throws IOException {
mockedFormData,
null,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null
);
Expand All @@ -302,7 +302,7 @@ public void testRegisterSourceHttpUrl() {
null,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
"http://localhost:1234/test"
);
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testUpdateMissingSinkConfig() {
}

private void registerDefaultSource() throws IOException {
registerDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
registerDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
}

private void registerDefaultSourceWithPackageUrl(String packageUrl) throws IOException {
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testRegisterSourceConflictingFields() throws Exception {
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);

SourceConfig sourceConfig = createDefaultSourceConfig();
try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
resource.registerSource(
actualTenant,
actualNamespace,
Expand Down Expand Up @@ -545,7 +545,7 @@ public void testUpdateSourceMissingTenant() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
"Tenant is not provided");
} catch (RestException re) {
Expand All @@ -565,7 +565,7 @@ public void testUpdateSourceMissingNamespace() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
"Namespace is not provided");
} catch (RestException re) {
Expand All @@ -585,7 +585,7 @@ public void testUpdateSourceMissingFunctionName() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
"Source name is not provided");
} catch (RestException re) {
Expand Down Expand Up @@ -654,7 +654,7 @@ public void testUpdateSourceNegativeParallelism() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
-2,
"Source parallelism must be a positive number");
} catch (RestException re) {
Expand All @@ -668,7 +668,7 @@ public void testUpdateSourceChangedParallelism() throws Exception {
try {
mockWorkerUtils();

try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (FileInputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
testUpdateSourceMissingArguments(
TENANT,
NAMESPACE,
Expand All @@ -677,7 +677,7 @@ public void testUpdateSourceChangedParallelism() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM + 1,
null);
}
Expand All @@ -691,7 +691,7 @@ public void testUpdateSourceChangedParallelism() throws Exception {
public void testUpdateSourceChangedTopic() throws Exception {
mockWorkerUtils();

try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (FileInputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
testUpdateSourceMissingArguments(
TENANT,
NAMESPACE,
Expand All @@ -700,7 +700,7 @@ public void testUpdateSourceChangedTopic() throws Exception {
mockedFormData,
"DifferentTopic",
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
PARALLELISM,
null);
}
Expand Down Expand Up @@ -756,7 +756,7 @@ public void testUpdateSourceWithNoChange() throws IOException {
// no changes but set the auth-update flag to true, should not fail
UpdateOptionsImpl updateOptions = new UpdateOptionsImpl();
updateOptions.setUpdateAuthData(true);
try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
resource.updateSource(
sourceConfig.getTenant(),
sourceConfig.getNamespace(),
Expand Down Expand Up @@ -784,7 +784,7 @@ public void testUpdateSourceZeroParallelism() throws Exception {
mockedFormData,
outputTopic,
outputSerdeClassName,
TWITTER_FIRE_HOSE,
DATAGEN_SOURCE,
0,
"Source parallelism must be a positive number");
} catch (RestException re) {
Expand Down Expand Up @@ -861,7 +861,7 @@ private void mockFunctionCommon(String tenant, String namespace, String function
}

private void updateDefaultSource() throws Exception {
updateDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString());
updateDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString());
}

private void updateDefaultSourceWithPackageUrl(String packageUrl) throws Exception {
Expand Down Expand Up @@ -910,7 +910,7 @@ public void testUpdateSourceUploadFailure() throws Exception {
FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);

try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) {
try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) {
resource.updateSource(
TENANT,
NAMESPACE,
Expand Down Expand Up @@ -940,7 +940,7 @@ public void testUpdateSourceSuccess() throws Exception {
public void testUpdateSourceWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);

String filePackageUrl = getPulsarIOTwitterNar().toURI().toString();
String filePackageUrl = getPulsarIODataGenNar().toURI().toString();

SourceConfig sourceConfig = createDefaultSourceConfig();

Expand Down Expand Up @@ -1454,7 +1454,7 @@ private SourceConfig createDefaultSourceConfig() {
sourceConfig.setTenant(TENANT);
sourceConfig.setNamespace(NAMESPACE);
sourceConfig.setName(source);
sourceConfig.setClassName(TWITTER_FIRE_HOSE);
sourceConfig.setClassName(DATAGEN_SOURCE);
sourceConfig.setParallelism(PARALLELISM);
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* Pulsar's Push Source interface. PushSource read data from
* external sources (database changes, twitter firehose, etc)
* external sources (database changes, etc)
* and publish to a Pulsar topic. The reason its called Push is
* because PushSources get passed a consumer that they
* invoke whenever they have data to be published to Pulsar.
Expand Down
5 changes: 0 additions & 5 deletions pulsar-io/docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@
<artifactId>pulsar-io-solr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-twitter</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading