Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Kafka.Basic/KafkaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Kafka.Basic
{
public interface IKafkaClient : IDisposable
{
IKafkaTopic Topic(string name);
IKafkaTopic Topic(string name, ProducerConfig config = null);
IKafkaConsumer Consumer(string groupName);
IKafkaConsumer Consumer(ConsumerOptions options);
IKafkaSimpleConsumer SimpleConsumer();
Expand All @@ -19,9 +19,9 @@ public KafkaClient(string zkConnect)
_zkConnection = new ZookeeperConnection(zkConnect);
}

public IKafkaTopic Topic(string name)
public IKafkaTopic Topic(string name, ProducerConfig config = null)
{
return new KafkaTopic(_zkConnection, name);
return new KafkaTopic(_zkConnection, name, config);
}

public IKafkaConsumer Consumer(string groupName)
Expand Down
8 changes: 5 additions & 3 deletions src/Kafka.Basic/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ public class KafkaTopic : IKafkaTopic
private readonly IProducer<string, KafkaMessage> _producer;
private readonly IZookeeperClient _zkClient;

public KafkaTopic(IZookeeperConnection zkConnect, string name)
public KafkaTopic(IZookeeperConnection zkConnect, string name, ProducerConfig producerConfig = null)
{
if (producerConfig == null)
producerConfig = ProducerConfig.GetDefaultConfig();
_zkConnect = zkConnect;
_name = name;
_zkClient = zkConnect.CreateClient();
_producer = _zkClient.CreateProducer<string, KafkaMessage>();
_producer = _zkClient.CreateProducer<string, KafkaMessage>(producerConfig);
}

public void Send(params Message[] messages)
{
_producer.Send(
Expand Down
10 changes: 8 additions & 2 deletions src/Kafka.Basic/ProducerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
public class ProducerConfig
{
public const short DefaultAcks = 1;
public const string DefaultClientId = "DefaultKafkaBasicClientId";
public const int DefaultSendTimeout = 1000;

public short Acks { get; set; }
public string ClientId { get; set; }
public int SendTimeout { get; set; }

public static ProducerConfig Default()
public static ProducerConfig GetDefaultConfig()
{
return new ProducerConfig
{
Acks = DefaultAcks
Acks = DefaultAcks,
ClientId = DefaultClientId,
SendTimeout = DefaultSendTimeout,
};
}

Expand Down
4 changes: 2 additions & 2 deletions src/Kafka.Basic/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
[assembly: AssemblyVersion("1.0.0.1")]
[assembly: AssemblyFileVersion("1.0.0.1")]
[assembly: InternalsVisibleTo("Kafka.Basic.Test")]
6 changes: 4 additions & 2 deletions src/Kafka.Basic/ZookeeperClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public IEnumerable<Broker> GetAllBrokers()

public IProducer<TKey, TMessage> CreateProducer<TKey, TMessage>()
{
return CreateProducer<TKey, TMessage>(ProducerConfig.Default());
return CreateProducer<TKey, TMessage>(ProducerConfig.GetDefaultConfig());
}

public IProducer<TKey, TMessage> CreateProducer<TKey, TMessage>(ProducerConfig config)
Expand All @@ -52,7 +52,9 @@ public IProducer<TKey, TMessage> CreateProducer<TKey, TMessage>(ProducerConfig c
}).ToList()
)
{
RequiredAcks = config.Acks
RequiredAcks = config.Acks,
ClientId = config.ClientId,
SendTimeout = config.SendTimeout
};

return new Producer<TKey, TMessage>(producerConfiguration);
Expand Down