✅ 1. High-Throughput Enhancements

Here’s how you can optimize your existing EventHubProducer class for high-throughput messaging:

🔧 Key Changes:

  • Reduce Task.Delay or eliminate it if needed.
  • Increase BatchSize and buffer settings.
  • Use asynchronous batching and less frequent flushes.
  • Optimize producer config for throughput.
  • Disable delivery reports if not needed.
  • Parallelize production better.

🔁 Updated EventHubProducer.cs (High-Throughput Optimized)

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using System.Text.Json;
using System.Collections.Generic;
using System.Text;
 
namespace kafkaProject
{
    public class EventHubProducer
    {
        private readonly string _bootstrapServers = "max-pre-prod.servicebus.windows.net:9093";
        private readonly string _saslPassword = "Endpoint=sb://kafkademo.servicebus.windows.net/;SharedAccessKeyName=MyManageSharedAccessKey;SharedAccessKey=test";
        private readonly string _eventHubName = "adttohis";
 
        private static int _idCounter = 1;
 
        public async Task ProduceBatchAsync(int count)
        {
            var config = new ProducerConfig
            {
                BootstrapServers = _bootstrapServers,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "$ConnectionString",
                SaslPassword = _saslPassword,
 
                // High-throughput tuning
                LingerMs = 100,                      // Wait up to 100ms to batch messages
                BatchSize = 100_000,                 // 100 KB batch size
                QueueBufferingMaxMessages = 100_000, // Allow up to 100k messages buffered
                QueueBufferingMaxKbytes = 102400,    // 100 MB total buffer
                CompressionType = CompressionType.Lz4, // Fast compression
                Acks = Acks.Leader,
                EnableIdempotence = false, // Not needed for Event Hubs
                ClientId = "eventhub-high-throughput-producer"
            };
 
            using var producer = new ProducerBuilder<Null, string>(config).Build();
 
            var tasks = new List<Task>();
            var successfulCount = 0;
            var failedCount = 0;
 
            var parallelism = 10; // Use multiple parallel producers
            var batchSize = count / parallelism;
 
            var produceTasks = new List<Task>();
 
            for (int i = 0; i < parallelism; i++)
            {
                produceTasks.Add(Task.Run(async () =>
                {
                    for (int j = 0; j < batchSize; j++)
                    {
                        var messageId = Interlocked.Increment(ref _idCounter);
                        var message = new MyMessage
                        {
                            Id = messageId,
                            Content = $"Message {messageId}",
                            Timestamp = DateTime.UtcNow
                        };
 
                        string jsonMessage = JsonSerializer.Serialize(message);
 
                        try
                        {
                            var kafkaMessage = new Message<Null, string>
                            {
                                Value = jsonMessage,
                                Headers = new Headers
                                {
                                    new Header("message-type", Encoding.UTF8.GetBytes("application/json"))
                                }
                            };
 
                            await producer.ProduceAsync(_eventHubName, kafkaMessage)
                                .ContinueWith(deliveryReport =>
                                {
                                    if (deliveryReport.Exception != null)
                                    {
                                        Console.WriteLine($"❌ Message {message.Id} failed: {deliveryReport.Exception.InnerException?.Message}");
                                        Interlocked.Increment(ref failedCount);
                                    }
                                    else
                                    {
                                        // Commented for performance; log only if needed
                                        // Console.WriteLine($"✅ Message {message.Id} delivered");
                                        Interlocked.Increment(ref successfulCount);
                                    }
                                });
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"❌ Exception on message {message.Id}: {ex.Message}");
                            Interlocked.Increment(ref failedCount);
                        }
                    }
                }));
            }
 
            await Task.WhenAll(produceTasks);
 
            producer.Flush(TimeSpan.FromSeconds(10));
 
            Console.WriteLine($"\n📊 Summary: {successfulCount} successful, {failedCount} failed");
        }
    }
 
    public class MyMessage
    {
        public int Id { get; set; }
        public string Content { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

📄 2. Kafka Producer Config Documentation

Here’s a table explaining each config parameter used in your producer:

Config KeyValuePurpose
BootstrapServersAzure Event Hub FQDNEvent Hub Kafka endpoint
SaslUsername$ConnectionStringRequired for Azure Event Hubs Kafka authentication
SaslPasswordEvent Hub connection stringSASL PLAIN password (your full connection string)
SecurityProtocolSaslSslAzure Event Hub requires SASL over SSL
SaslMechanismPlainUse PLAIN auth mechanism
LingerMs100Wait up to 100ms before sending to allow batching
BatchSize100000Max size in bytes for a message batch (~100 KB)
QueueBufferingMaxMessages100000Max number of messages buffered in memory
QueueBufferingMaxKbytes102400Total memory used to buffer messages (in KB)
CompressionTypeLz4Use LZ4 compression for speed and size efficiency
AcksLeaderWait for leader ack; Event Hub doesn’t support all
EnableIdempotencefalseNot needed in Azure Event Hubs context
ClientId"eventhub-high-throughput-producer"Useful for tracking clients in logs

⚠️ Best Practices for Azure Event Hubs (Kafka)

  • Partitioning: Use multiple partitions in Event Hub for parallelism.
  • Message Size: Keep messages under 1 MB.
  • Compression: LZ4 balances speed and size; avoid gzip unless needed.
  • Avoid Flush in Loops: Only call Flush() once after all messages are sent.