diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml index 3a9f0fd942c17..6d2a33ec9d26f 100644 --- a/deployment/terraform-ansible/deploy-pulsar.yaml +++ b/deployment/terraform-ansible/deploy-pulsar.yaml @@ -162,7 +162,6 @@ # - rabbitmq # - redis # - solr -# - twitter - name: Set up broker template: src: "../templates/broker.conf" diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index cf7731b4c85ab..b621dd3872b53 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -45,7 +45,6 @@ --> ${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar - ${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar ${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar ${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar ${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml index 1a0a9366949e7..d1b46bff7620e 100644 --- a/pulsar-bom/pom.xml +++ b/pulsar-bom/pom.xml @@ -580,11 +580,6 @@ pulsar-io-solr ${project.version} - - org.apache.pulsar - pulsar-io-twitter - ${project.version} - org.apache.pulsar pulsar-io diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index 02171c8276fea..b27b6d7357482 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -179,7 +179,15 @@ ${project.groupId} - pulsar-io-twitter + pulsar-io-data-generator + ${project.version} + pom + test + + + + ${project.groupId} + pulsar-io-netty ${project.version} pom test @@ -245,12 +253,21 @@ ${project.groupId} - pulsar-io-twitter + pulsar-io-data-generator + ${project.version} + jar + true + ${project.build.directory} + pulsar-io-data-generator.nar + + + ${project.groupId} + pulsar-io-netty ${project.version} jar true ${project.build.directory} - pulsar-io-twitter.nar + pulsar-io-netty.nar ${project.groupId} @@ -282,10 +299,10 @@ ${project.build.directory}/pulsar-io-data-generator.nar + ${project.build.directory}/pulsar-io-netty.nar ${project.build.directory}/pulsar-functions-api-examples.jar ${project.build.directory}/pulsar-functions-api-examples.nar ${project.build.directory}/pulsar-io-cassandra.nar - ${project.build.directory}/pulsar-io-twitter.nar ${project.build.directory}/pulsar-functions-api-examples.jar diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java index dd67c39c75fae..aafd78cbba7ac 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/AbstractFunctionsResourceTest.java @@ -86,7 +86,8 @@ public abstract class AbstractFunctionsResourceTest { protected static final String CASSANDRA_STRING_SINK = "org.apache.pulsar.io.cassandra.CassandraStringSink"; protected static final int PARALLELISM = 1; private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path"; - private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH = "pulsar-io-twitter.nar.path"; + private static final String SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path"; + private static final String SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH = "pulsar-io-netty.nar.path"; private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH = "pulsar-io-invalid.nar.path"; private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH = "pulsar-functions-api-examples.nar.path"; @@ -123,10 +124,16 @@ public static File getPulsarIOCassandraNar() { + SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + " system property")); } - public static File getPulsarIOTwitterNar() { - return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH) - , "pulsar-io-twitter.nar file location must be specified with " - + SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + " system property")); + public static File getPulsarIODataGenNar() { + return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH) + , "pulsar-io-data-generator.nar file location must be specified with " + + SYSTEM_PROPERTY_NAME_DATAGEN_NAR_FILE_PATH + " system property")); + } + + public static File getPulsarIONettyNar() { + return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH) + , "pulsar-io-netty.nar file location must be specified with " + + SYSTEM_PROPERTY_NAME_NETTY_NAR_FILE_PATH + " system property")); } public static File getPulsarIOInvalidNar() { @@ -211,7 +218,7 @@ protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { } protected File getDefaultNarFile() { - return getPulsarIOTwitterNar(); + return getPulsarIODataGenNar(); } protected void doSetup() throws Exception { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 7c0929a6a9fe0..2ef2e42d6e749 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -226,7 +226,7 @@ public void testRegisterSinkMissingPackageDetails() { public void testRegisterSinkInvalidJarNoSink() throws IOException { mockInstanceUtils(); try { - try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (FileInputStream inputStream = new FileInputStream(getPulsarIONettyNar())) { testRegisterSinkMissingArguments( TENANT, NAMESPACE, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 81170cf569a58..f84b3c2e524b2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -74,7 +74,7 @@ public class SourceApiV3ResourceTest extends AbstractFunctionsResourceTest { private static final String source = "test-source"; private static final String outputTopic = "test-output-topic"; private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE; - private static final String TWITTER_FIRE_HOSE = "org.apache.pulsar.io.twitter.TwitterFireHose"; + private static final String DATAGEN_SOURCE = "org.apache.pulsar.io.datagenerator.DataGeneratorSource"; private SourcesImpl resource; @Override @@ -109,7 +109,7 @@ public void testRegisterSourceMissingTenant() { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null ); @@ -130,7 +130,7 @@ public void testRegisterSourceMissingNamespace() { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null ); @@ -151,7 +151,7 @@ public void testRegisterSourceMissingSourceName() { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null ); @@ -206,7 +206,7 @@ public void testRegisterSourceMissingPackage() { @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided") public void testRegisterSourceMissingPackageDetails() throws IOException { - try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { testRegisterSourceMissingArguments( TENANT, NAMESPACE, @@ -215,7 +215,7 @@ public void testRegisterSourceMissingPackageDetails() throws IOException { null, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null ); @@ -271,7 +271,7 @@ public void testRegisterSourceInvalidJarWithNoSource() throws IOException { @Test public void testRegisterSourceNoOutputTopic() throws IOException { - try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { testRegisterSourceMissingArguments( TENANT, NAMESPACE, @@ -280,7 +280,7 @@ public void testRegisterSourceNoOutputTopic() throws IOException { mockedFormData, null, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null ); @@ -302,7 +302,7 @@ public void testRegisterSourceHttpUrl() { null, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, "http://localhost:1234/test" ); @@ -386,7 +386,7 @@ public void testUpdateMissingSinkConfig() { } private void registerDefaultSource() throws IOException { - registerDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString()); + registerDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString()); } private void registerDefaultSourceWithPackageUrl(String packageUrl) throws IOException { @@ -482,7 +482,7 @@ public void testRegisterSourceConflictingFields() throws Exception { when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); SourceConfig sourceConfig = createDefaultSourceConfig(); - try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { resource.registerSource( actualTenant, actualNamespace, @@ -545,7 +545,7 @@ public void testUpdateSourceMissingTenant() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, "Tenant is not provided"); } catch (RestException re) { @@ -565,7 +565,7 @@ public void testUpdateSourceMissingNamespace() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, "Namespace is not provided"); } catch (RestException re) { @@ -585,7 +585,7 @@ public void testUpdateSourceMissingFunctionName() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, "Source name is not provided"); } catch (RestException re) { @@ -654,7 +654,7 @@ public void testUpdateSourceNegativeParallelism() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, -2, "Source parallelism must be a positive number"); } catch (RestException re) { @@ -668,7 +668,7 @@ public void testUpdateSourceChangedParallelism() throws Exception { try { mockWorkerUtils(); - try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (FileInputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { testUpdateSourceMissingArguments( TENANT, NAMESPACE, @@ -677,7 +677,7 @@ public void testUpdateSourceChangedParallelism() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM + 1, null); } @@ -691,7 +691,7 @@ public void testUpdateSourceChangedParallelism() throws Exception { public void testUpdateSourceChangedTopic() throws Exception { mockWorkerUtils(); - try (FileInputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (FileInputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { testUpdateSourceMissingArguments( TENANT, NAMESPACE, @@ -700,7 +700,7 @@ public void testUpdateSourceChangedTopic() throws Exception { mockedFormData, "DifferentTopic", outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, PARALLELISM, null); } @@ -756,7 +756,7 @@ public void testUpdateSourceWithNoChange() throws IOException { // no changes but set the auth-update flag to true, should not fail UpdateOptionsImpl updateOptions = new UpdateOptionsImpl(); updateOptions.setUpdateAuthData(true); - try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { resource.updateSource( sourceConfig.getTenant(), sourceConfig.getNamespace(), @@ -784,7 +784,7 @@ public void testUpdateSourceZeroParallelism() throws Exception { mockedFormData, outputTopic, outputSerdeClassName, - TWITTER_FIRE_HOSE, + DATAGEN_SOURCE, 0, "Source parallelism must be a positive number"); } catch (RestException re) { @@ -861,7 +861,7 @@ private void mockFunctionCommon(String tenant, String namespace, String function } private void updateDefaultSource() throws Exception { - updateDefaultSourceWithPackageUrl(getPulsarIOTwitterNar().toURI().toString()); + updateDefaultSourceWithPackageUrl(getPulsarIODataGenNar().toURI().toString()); } private void updateDefaultSourceWithPackageUrl(String packageUrl) throws Exception { @@ -910,7 +910,7 @@ public void testUpdateSourceUploadFailure() throws Exception { FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData); - try (InputStream inputStream = new FileInputStream(getPulsarIOTwitterNar())) { + try (InputStream inputStream = new FileInputStream(getPulsarIODataGenNar())) { resource.updateSource( TENANT, NAMESPACE, @@ -940,7 +940,7 @@ public void testUpdateSourceSuccess() throws Exception { public void testUpdateSourceWithUrl() throws Exception { Configurator.setRootLevel(Level.DEBUG); - String filePackageUrl = getPulsarIOTwitterNar().toURI().toString(); + String filePackageUrl = getPulsarIODataGenNar().toURI().toString(); SourceConfig sourceConfig = createDefaultSourceConfig(); @@ -1454,7 +1454,7 @@ private SourceConfig createDefaultSourceConfig() { sourceConfig.setTenant(TENANT); sourceConfig.setNamespace(NAMESPACE); sourceConfig.setName(source); - sourceConfig.setClassName(TWITTER_FIRE_HOSE); + sourceConfig.setClassName(DATAGEN_SOURCE); sourceConfig.setParallelism(PARALLELISM); sourceConfig.setTopicName(outputTopic); sourceConfig.setSerdeClassName(outputSerdeClassName); diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java index 6acccdda121db..47cc31d5a5337 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java @@ -24,7 +24,7 @@ /** * Pulsar's Push Source interface. PushSource read data from - * external sources (database changes, twitter firehose, etc) + * external sources (database changes, etc) * and publish to a Pulsar topic. The reason its called Push is * because PushSources get passed a consumer that they * invoke whenever they have data to be published to Pulsar. diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml index b5cc0fceda1a9..5302943d1013d 100644 --- a/pulsar-io/docs/pom.xml +++ b/pulsar-io/docs/pom.xml @@ -212,11 +212,6 @@ pulsar-io-solr ${project.version} - - ${project.groupId} - pulsar-io-twitter - ${project.version} - diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 950bd427343dd..0e80914e25711 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -49,7 +49,6 @@ common docs aws - twitter cassandra aerospike http @@ -88,7 +87,6 @@ batch-data-generator common aws - twitter cassandra aerospike http @@ -141,9 +139,9 @@ batch-discovery-triggerers batch-data-generator common - twitter cassandra data-generator + netty diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml deleted file mode 100644 index 229e551f2e0d8..0000000000000 --- a/pulsar-io/twitter/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ - - - 4.0.0 - - org.apache.pulsar - pulsar-io - 4.2.0-SNAPSHOT - - - pulsar-io-twitter - Pulsar IO :: Twitter - - - - - ${project.groupId} - pulsar-io-core - ${project.version} - - - - com.fasterxml.jackson.core - jackson-databind - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - - - - com.twitter - hbc-core - ${hbc-core.version} - - - - org.apache.commons - commons-collections4 - - - - org.apache.commons - commons-lang3 - - - - ${project.groupId} - pulsar-io-common - ${project.version} - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - - ${skipDeployConnector} - - - - org.apache.nifi - nifi-nar-maven-plugin - - - - - diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java deleted file mode 100644 index 30950ffefe2a4..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.twitter.hbc.ClientBuilder; -import com.twitter.hbc.common.DelimitedStreamReader; -import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; -import com.twitter.hbc.core.endpoint.StreamingEndpoint; -import com.twitter.hbc.core.processor.HosebirdMessageProcessor; -import com.twitter.hbc.httpclient.BasicClient; -import com.twitter.hbc.httpclient.auth.Authentication; -import com.twitter.hbc.httpclient.auth.OAuth1; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pulsar.io.common.IOConfigUtils; -import org.apache.pulsar.io.core.PushSource; -import org.apache.pulsar.io.core.SourceContext; -import org.apache.pulsar.io.core.annotations.Connector; -import org.apache.pulsar.io.core.annotations.IOType; -import org.apache.pulsar.io.twitter.data.TweetData; -import org.apache.pulsar.io.twitter.data.TwitterRecord; -import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple Push based Twitter FireHose Source. - */ -@Connector( - name = "twitter", - type = IOType.SOURCE, - help = "A simple connector moving tweets from Twitter FireHose to Pulsar", - configClass = TwitterFireHoseConfig.class -) -@Slf4j -public class TwitterFireHose extends PushSource { - - private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class); - - // ----- Runtime fields - private Object waitObject; - - private final ObjectMapper mapper = new ObjectMapper().configure( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - @Override - public void open(Map config, SourceContext sourceContext) throws IOException { - TwitterFireHoseConfig hoseConfig = IOConfigUtils.loadWithSecrets(config, - TwitterFireHoseConfig.class, sourceContext); - hoseConfig.validate(); - waitObject = new Object(); - startThread(hoseConfig); - } - - @Override - public void close() throws Exception { - stopThread(); - } - - private void startThread(TwitterFireHoseConfig config) { - - BasicClient client = new ClientBuilder() - .name(config.getClientName()) - .hosts(config.getClientHosts()) - .endpoint(getEndpoint(config)) - .authentication(getAuthentication(config)) - .processor(new HosebirdMessageProcessor() { - public DelimitedStreamReader reader; - - @Override - public void setup(InputStream input) { - reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, - config.getClientBufferSize()); - } - - @Override - public boolean process() throws IOException, InterruptedException { - String tweetStr = reader.readLine(); - try { - TweetData tweet = mapper.readValue(tweetStr, TweetData.class); - // We don't really care if the record succeeds or not. - // However might be in the future to count failures - // TODO:- Figure out the metrics story for connectors - consume(new TwitterRecord(tweet, config.getGuestimateTweetTime())); - } catch (Exception e) { - LOG.error("Exception thrown", e); - } - return true; - } - }) - .build(); - - Thread runnerThread = new Thread(() -> { - LOG.info("Started the Twitter FireHose Runner Thread"); - client.connect(); - LOG.info("Twitter Streaming API connection established successfully"); - - // just wait now - try { - synchronized (waitObject) { - waitObject.wait(); - } - } catch (Exception e) { - LOG.info("Got a exception in waitObject"); - } - LOG.debug("Closing Twitter Streaming API connection"); - client.stop(); - LOG.info("Twitter Streaming API connection closed"); - LOG.info("Twitter FireHose Runner Thread ending"); - }); - runnerThread.setName("TwitterFireHoseRunner"); - runnerThread.start(); - } - - private void stopThread() { - LOG.info("Source closed"); - synchronized (waitObject) { - waitObject.notify(); - } - } - - private Authentication getAuthentication(TwitterFireHoseConfig config) { - return new OAuth1(config.getConsumerKey(), - config.getConsumerSecret(), - config.getToken(), - config.getTokenSecret()); - } - - private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) { - List followings = config.getFollowings(); - List terms = config.getTrackTerms(); - - if (CollectionUtils.isEmpty(followings) && CollectionUtils.isEmpty(terms)) { - return new SampleStatusesEndpoint().createEndpoint(); - } else { - StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); - - if (CollectionUtils.isNotEmpty(followings)) { - hosebirdEndpoint.followings(followings); - } - - if (CollectionUtils.isNotEmpty(terms)) { - hosebirdEndpoint.trackTerms(terms); - } - - return hosebirdEndpoint; - } - } -} diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java deleted file mode 100644 index bb02cdde38b87..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.common.collect.Lists; -import com.twitter.hbc.core.Constants; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import lombok.Data; -import lombok.experimental.Accessors; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.io.core.annotations.FieldDoc; - -/** - * Configuration object for the Twitter Firehose Connector. - */ -@Data -@Accessors(chain = true) -public class TwitterFireHoseConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - @FieldDoc( - required = true, - defaultValue = "", - sensitive = true, - help = "Your twitter app consumer key. See " - + "https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" - ) - private String consumerKey; - - @FieldDoc( - required = true, - defaultValue = "", - sensitive = true, - help = "Your twitter app consumer secret. " - + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" - ) - private String consumerSecret; - - @FieldDoc( - required = true, - defaultValue = "", - sensitive = true, - help = "Your twitter app token. " - + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" - ) - private String token; - - @FieldDoc( - required = true, - defaultValue = "", - sensitive = true, - help = "Your twitter app token secret. " - + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details" - ) - private String tokenSecret; - - // Most firehose events have null createdAt time. If this parameter is set to true - // then we estimate the createdTime of each firehose event to be current time. - @FieldDoc( - required = false, - defaultValue = "false", - help = "Most firehose events have null createdAt time.If this parameter is set to true, " - + "the connector estimates the createdTime of each firehose event to be current time." - ) - private Boolean guestimateTweetTime = false; - - // ------ Optional property keys - - @FieldDoc( - required = false, - defaultValue = "pulsario-twitter-source", - help = "The Twitter Firehose Client name" - ) - private String clientName = "pulsario-twitter-source"; - - @FieldDoc( - required = false, - defaultValue = Constants.STREAM_HOST, - help = "The Twitter Firehose stream hosts that the connector connects to" - ) - private String clientHosts = Constants.STREAM_HOST; - - @FieldDoc( - required = false, - defaultValue = "50000", - help = "The Twitter Firehose client buffer size" - ) - private int clientBufferSize = 50000; - - @FieldDoc( - required = false, - defaultValue = "", - help = "A comma separated list of user IDs, indicating the users to return statuses for in the stream." - ) - private String followings; - - @FieldDoc( - required = false, - defaultValue = "", - help = "Keywords to track. Phrases of keywords are specified by a comma-separated list." - ) - private String terms; - - public static TwitterFireHoseConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), TwitterFireHoseConfig.class); - } - - public static TwitterFireHoseConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), TwitterFireHoseConfig.class); - } - - public void validate() throws IllegalArgumentException { - if (getConsumerKey() == null || getConsumerSecret() == null - || getToken() == null || getTokenSecret() == null) { - throw new IllegalArgumentException("Required property not set."); - } - } - - public List getFollowings() { - if (StringUtils.isBlank(followings)) { - return Collections.emptyList(); - } - - List result = new ArrayList (); - - for (String s: StringUtils.split(followings, ",")) { - try { - result.add(Long.parseLong(StringUtils.trim(s))); - } catch (NumberFormatException nfEx) { - // Ignore these - } - } - - return CollectionUtils.isEmpty(result) ? Collections.emptyList() : result; - } - - public List getTrackTerms() { - if (StringUtils.isBlank(terms)) { - return Collections.emptyList(); - } - - return Lists.newArrayList(StringUtils.split(terms, ",")); - } -} \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java deleted file mode 100644 index 0b9a3199fa7f5..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter.data; - -import lombok.Data; - -/** - * POJO for Tweet object. - */ -@Data -public class TweetData { - private String createdAt; - private Long id; - private String idStr; - private String text; - private String source; - private Boolean truncated; - private User user; - private RetweetedStatus retweetedStatus; - private Boolean isQuoteStatus; - private Long quoteCount; - private Long replyCount; - private Long retweetCount; - private Long favoriteCount; - private Boolean favorited; - private Boolean retweeted; - private String filterLevel; - private String lang; - private String timestampMs; - private Delete delete; - - /** - * POJO for Twitter User object. - */ - @Data - public static class User { - private Long id; - private String idStr; - private String name; - private String screenName; - private String location; - private String description; - private String translatorType; - private Boolean protectedUser; - private Boolean verified; - private Long followersCount; - private Long friendsCount; - private Long listedCount; - private Long favouritesCount; - private Long statusesCount; - private String createdAt; - private Boolean geoEnabled; - private String lang; - private Boolean contributorsEnabled; - private Boolean isTranslator; - private String profileBackgroundColor; - private String profileBackgroundImageUrl; - private String profileBackgroundImageUrlHttps; - private Boolean profileBackgroundTile; - private String profileLinkColor; - private String profileSidebarBorderColor; - private String profileSidebarFillColor; - private String profileTextColor; - private Boolean profileUseBackgroundImage; - private String profileImageUrl; - private String profileImageUrlHttps; - private String profileBannerUrl; - private Boolean defaultProfile; - private Boolean defaultProfileImage; - } - - /** - * POJO for Re-Tweet object. - */ - @Data - public static class RetweetedStatus { - private String createdAt; - private Long id; - private String idStr; - private String text; - private String source; - private Boolean truncated; - private User user; - private Boolean isQuoteStatus; - private Long quoteCount; - private Long replyCount; - private Long retweetCount; - private Long favoriteCount; - private Boolean favorited; - private Boolean retweeted; - private String filterLevel; - private String lang; - } - - /** - * POJO for Tweet Status object. - */ - @Data - public static class Status { - private Long id; - private String idStr; - private Long userId; - private String userIdStr; - } - - /** - * POJO for Tweet Delete object. - */ - @Data - public static class Delete { - private Status status; - private String timestampMs; - } -} diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java deleted file mode 100644 index 7292c05548b64..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter.data; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Optional; -import org.apache.pulsar.functions.api.Record; - -/** - * Twitter Record object. - */ -public class TwitterRecord implements Record { - private final TweetData tweet; - private static final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy"); - private final boolean guestimateTweetTime; - - public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) { - this.tweet = tweet; - this.guestimateTweetTime = guestimateTweetTime; - } - - @Override - public Optional getKey() { - // TODO: Could use user or tweet ID as key here - return Optional.empty(); - } - - @Override - public Optional getEventTime() { - try { - if (tweet.getCreatedAt() != null) { - Date d = dateFormat.parse(tweet.getCreatedAt()); - return Optional.of(d.toInstant().toEpochMilli()); - } else if (guestimateTweetTime) { - return Optional.of(System.currentTimeMillis()); - } else { - return Optional.empty(); - } - } catch (Exception e) { - return Optional.empty(); - } - } - - @Override - public TweetData getValue() { - return tweet; - } -} \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java deleted file mode 100644 index eadf21b6a8900..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter.data; \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java deleted file mode 100644 index ca942076a1603..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter.endpoint; - -import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; -import com.twitter.hbc.core.endpoint.StreamingEndpoint; -import java.io.Serializable; - -/** - * Required for Twitter Client. - */ -public class SampleStatusesEndpoint implements Serializable { - /** - * - */ - private static final long serialVersionUID = 1L; - - public StreamingEndpoint createEndpoint() { - // Returns the sample endpoint: Returning a sample from the firehose (all tweets) - StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); - endpoint.stallWarnings(false); - endpoint.delimited(false); - return endpoint; - } -} \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java deleted file mode 100644 index 2bca50c06c1a9..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter.endpoint; \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java deleted file mode 100644 index 23f810a0240fc..0000000000000 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter; \ No newline at end of file diff --git a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml deleted file mode 100644 index 39d7d974a4e8d..0000000000000 --- a/pulsar-io/twitter/src/main/resources/META-INF/services/pulsar-io.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -name: twitter -description: Ingest data from Twitter firehose -sourceClass: org.apache.pulsar.io.twitter.TwitterFireHose -sourceConfigClass: org.apache.pulsar.io.twitter.TwitterFireHoseConfig diff --git a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java deleted file mode 100644 index 6f5f99ce26025..0000000000000 --- a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.twitter; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.testng.annotations.Test; - -public class TwitterFireHoseConfigTest { - - private TwitterFireHoseConfig config; - - @Test - public final void loadFromYamlFileTest() throws IOException { - File yamlFile = getFile("sourceConfig.yaml"); - config = TwitterFireHoseConfig.load(yamlFile.getAbsolutePath()); - assertNotNull(config); - } - - @Test - public final void loadFromMapTest() throws IOException { - Map map = new HashMap<> (); - map.put("consumerKey", "xxx"); - map.put("consumerSecret", "xxx"); - map.put("token", "xxx"); - map.put("tokenSecret", "xxx"); - - config = TwitterFireHoseConfig.load(map); - - assertNotNull(config); - } - - @Test - public final void validValidateTest() throws IOException { - Map map = new HashMap<> (); - map.put("consumerKey", "xxx"); - map.put("consumerSecret", "xxx"); - map.put("token", "xxx"); - map.put("tokenSecret", "xxx"); - - config = TwitterFireHoseConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required property not set.") - public final void missingConsumerKeyValidateTest() throws IOException { - Map map = new HashMap<> (); - - config = TwitterFireHoseConfig.load(map); - config.validate(); - } - - @Test - public final void getFollowingsTest() throws IOException { - Map map = new HashMap<> (); - map.put("followings", "123, 456, 789"); - config = TwitterFireHoseConfig.load(map); - - List followings = config.getFollowings(); - assertNotNull(followings); - assertEquals(followings.size(), 3); - assertTrue(followings.contains(123L)); - assertTrue(followings.contains(456L)); - assertTrue(followings.contains(789L)); - } - - - @Test - public final void getTermsTest() throws IOException { - Map map = new HashMap<> (); - map.put("terms", "mickey, donald, goofy"); - config = TwitterFireHoseConfig.load(map); - - List terms = config.getTrackTerms(); - assertNotNull(terms); - assertEquals(terms.size(), 3); - assertTrue(terms.contains("mickey")); - } - - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); - } - -} diff --git a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml deleted file mode 100644 index 9ac5708e37fbb..0000000000000 --- a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -{ -"consumerKey": "", -"consumerSecret": "" -} \ No newline at end of file