diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 7ab93d6a8c..8262caa2af 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -1,19 +1,4 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +cat << 'EOF' > src/main/java/kafka/examples/KafkaConsumerProducerDemo.java package kafka.examples; import java.util.Optional; @@ -21,53 +6,74 @@ import java.util.concurrent.TimeUnit; /** - * This example can be decomposed into the following stages: - *

- * 1. Clean any topics left from previous runs. - * 2. Create a producer thread to send a set of records to topic1. - * 3. Create a consumer thread to fetch all previously sent records from topic1. - *

- * If you are using IntelliJ IDEA, the above arguments should be put in `Modify Run Configuration - Program Arguments`. - * You can also set an output log file in `Modify Run Configuration - Modify options - Save console output to file` to - * record all the log output together. + * KafkaConsumerProducerDemo demonstrates a simple Kafka producer-consumer flow. + * + * Steps: + * 1. Cleans any previous topic data. + * 2. Starts a producer thread to send a number of records to a topic. + * 3. Starts a consumer thread to consume the records from the topic. + * + * Usage: + * java KafkaConsumerProducerDemo [sync] + * - numRecords: total number of records to produce + * - sync: optional argument; if 'sync', sends records synchronously, else async */ public class KafkaConsumerProducerDemo { - public static final String TOPIC_NAME = "my-topic"; - public static final String GROUP_NAME = "my-group"; + public static final String TOPIC = "my-topic"; // Topic name for producer/consumer + public static final String GROUP = "my-group"; // Consumer group public static void main(String[] args) { - try { - if (args.length == 0) { - Utils.printHelp("This example takes 2 parameters (i.e. 10000 sync):%n" + - "- records: total number of records to send (required)%n" + - "- mode: pass 'sync' to send records synchronously (optional)"); - return; - } + // Validate input arguments + if (args.length == 0) { + System.out.println("Usage: java KafkaConsumerProducerDemo [sync]"); + return; + } - int numRecords = Integer.parseInt(args[0]); - boolean isAsync = args.length == 1 || !args[1].trim().equalsIgnoreCase("sync"); + int numRecords = Integer.parseInt(args[0]); + boolean isAsync = args.length == 1 || !args[1].equalsIgnoreCase("sync"); - // stage 1: clean any topics left from previous runs - Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, -1, TOPIC_NAME); + try { + // Stage 1: Clean the topic from previous runs + Utils.recreateTopics(KafkaProperties.BOOTSTRAP_SERVERS, -1, TOPIC); CountDownLatch latch = new CountDownLatch(2); - // stage 2: produce records to topic1 - Producer producerThread = new Producer( - "producer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, isAsync, null, false, numRecords, -1, latch); - producerThread.start(); + // Stage 2: Start producer thread + Producer producer = new Producer( + "producer", + KafkaProperties.BOOTSTRAP_SERVERS, + TOPIC, + isAsync, + null, + false, + numRecords, + -1, + latch + ); + producer.start(); - // stage 3: consume records from topic1 - Consumer consumerThread = new Consumer( - "consumer", KafkaProperties.BOOTSTRAP_SERVERS, TOPIC_NAME, GROUP_NAME, Optional.empty(), false, numRecords, latch); - consumerThread.start(); + // Stage 3: Start consumer thread + Consumer consumer = new Consumer( + "consumer", + KafkaProperties.BOOTSTRAP_SERVERS, + TOPIC, + GROUP, + Optional.empty(), + false, + numRecords, + latch + ); + consumer.start(); + // Wait for both threads to finish (timeout 5 minutes) if (!latch.await(5, TimeUnit.MINUTES)) { - Utils.printErr("Timeout after 5 minutes waiting for termination"); - producerThread.shutdown(); - consumerThread.shutdown(); + System.err.println("Timeout after 5 minutes. Shutting down producer and consumer."); + producer.shutdown(); + consumer.shutdown(); } - } catch (Throwable e) { + + } catch (Exception e) { e.printStackTrace(); } } } +EOF \ No newline at end of file