diff --git a/src/Kafka.Basic/KafkaClient.cs b/src/Kafka.Basic/KafkaClient.cs index d86dc02..0fc5bbc 100644 --- a/src/Kafka.Basic/KafkaClient.cs +++ b/src/Kafka.Basic/KafkaClient.cs @@ -4,7 +4,7 @@ namespace Kafka.Basic { public interface IKafkaClient : IDisposable { - IKafkaTopic Topic(string name); + IKafkaTopic Topic(string name, ProducerConfig config = null); IKafkaConsumer Consumer(string groupName); IKafkaConsumer Consumer(ConsumerOptions options); IKafkaSimpleConsumer SimpleConsumer(); @@ -19,9 +19,9 @@ public KafkaClient(string zkConnect) _zkConnection = new ZookeeperConnection(zkConnect); } - public IKafkaTopic Topic(string name) + public IKafkaTopic Topic(string name, ProducerConfig config = null) { - return new KafkaTopic(_zkConnection, name); + return new KafkaTopic(_zkConnection, name, config); } public IKafkaConsumer Consumer(string groupName) diff --git a/src/Kafka.Basic/KafkaTopic.cs b/src/Kafka.Basic/KafkaTopic.cs index dc9c06f..88d660c 100644 --- a/src/Kafka.Basic/KafkaTopic.cs +++ b/src/Kafka.Basic/KafkaTopic.cs @@ -18,14 +18,16 @@ public class KafkaTopic : IKafkaTopic private readonly IProducer _producer; private readonly IZookeeperClient _zkClient; - public KafkaTopic(IZookeeperConnection zkConnect, string name) + public KafkaTopic(IZookeeperConnection zkConnect, string name, ProducerConfig producerConfig = null) { + if (producerConfig == null) + producerConfig = ProducerConfig.GetDefaultConfig(); _zkConnect = zkConnect; _name = name; _zkClient = zkConnect.CreateClient(); - _producer = _zkClient.CreateProducer(); + _producer = _zkClient.CreateProducer(producerConfig); } - + public void Send(params Message[] messages) { _producer.Send( diff --git a/src/Kafka.Basic/ProducerConfig.cs b/src/Kafka.Basic/ProducerConfig.cs index 6e89d71..f1cd925 100644 --- a/src/Kafka.Basic/ProducerConfig.cs +++ b/src/Kafka.Basic/ProducerConfig.cs @@ -3,14 +3,20 @@ public class ProducerConfig { public const short DefaultAcks = 1; + public const string DefaultClientId = "DefaultKafkaBasicClientId"; + public const int DefaultSendTimeout = 1000; public short Acks { get; set; } + public string ClientId { get; set; } + public int SendTimeout { get; set; } - public static ProducerConfig Default() + public static ProducerConfig GetDefaultConfig() { return new ProducerConfig { - Acks = DefaultAcks + Acks = DefaultAcks, + ClientId = DefaultClientId, + SendTimeout = DefaultSendTimeout, }; } diff --git a/src/Kafka.Basic/Properties/AssemblyInfo.cs b/src/Kafka.Basic/Properties/AssemblyInfo.cs index dc86c14..32fad81 100644 --- a/src/Kafka.Basic/Properties/AssemblyInfo.cs +++ b/src/Kafka.Basic/Properties/AssemblyInfo.cs @@ -32,6 +32,6 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] +[assembly: AssemblyVersion("1.0.0.1")] +[assembly: AssemblyFileVersion("1.0.0.1")] [assembly: InternalsVisibleTo("Kafka.Basic.Test")] \ No newline at end of file diff --git a/src/Kafka.Basic/ZookeeperClient.cs b/src/Kafka.Basic/ZookeeperClient.cs index 3bc3e0c..5e53fec 100644 --- a/src/Kafka.Basic/ZookeeperClient.cs +++ b/src/Kafka.Basic/ZookeeperClient.cs @@ -37,7 +37,7 @@ public IEnumerable GetAllBrokers() public IProducer CreateProducer() { - return CreateProducer(ProducerConfig.Default()); + return CreateProducer(ProducerConfig.GetDefaultConfig()); } public IProducer CreateProducer(ProducerConfig config) @@ -52,7 +52,9 @@ public IProducer CreateProducer(ProducerConfig c }).ToList() ) { - RequiredAcks = config.Acks + RequiredAcks = config.Acks, + ClientId = config.ClientId, + SendTimeout = config.SendTimeout }; return new Producer(producerConfiguration);