When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.
While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.
This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.
What is Kafka Stream?
According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
What do we want to achieve?
- Run Kafka with KRaft in docker.
 - Run a producer on your local machine (not in docker) that publishes messages to a topic.
 - Run a consumer on your local machine (not in docker) that pulls a message whenever a message is published in a topic.]
 
Resources
- You can find the source code at my GitHub repository
 - Docker image: Bitnami Kafka
 
Run Kafka in Docker
Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.
docker-compose up -d
Docker-compose.yml
version: "3"
services:
  kafka:
    image: "bitnami/kafka:3.2.3"
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER: It controls whether the Kafka broker allows connections over plaintext listeners. A plaintext listener allows connections over an unencrypted and non-authenticated channel. It means the data exchanged between the client and the Kafka broker is sent in plain text.
 - KAFKA_ENABLE_KRAFT: Enable KRaft mode.
 - KAFKA_CFG_ADVERTISED_LISTENERS: The hostname a client should use to connect to the Kafka broker. You need this for both producer and consumer to communicate with Kafka.
 
Let’s publish a message to Kafka
Navigate to where KafkaStream.Producer.csproj is at. Start a Powershell and run the following command.
dotnet run
When you see the message: “Message delivered to input-topic”, that means you did it!
What is in the Producer?
- We first setup the configuration for the producer.
 - Build a message
 - Produce the message
 
var config = new ProducerConfig
{
	BootstrapServers = "localhost:9092",
	ClientId = "producer-1"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
	var message = new Message<string, string>
	{
		Key = null, // Set the key if you want to partition the messages
		Value = "Hello, Kafka!"
	};
	try
	{
		var deliveryResult = await producer
                                    .ProduceAsync("input-topic", message);
		Console.WriteLine($"Message delivered to 
                            {deliveryResult.TopicPartitionOffset}");
	}
	catch (ProduceException<string, string> ex)
	{
		Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
	}
}
What is bootstrap-server?
It serves as an entry point for a client to connect to a Kafka broker.
Let’s setup the consumer
Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.
dotnet run
The initialization will take a few seconds. Then you should see the following messages.
Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0
There you go!
What is in the Consumer?
- The main thread sets up the Kafka consumer and starts a background thread to handle message consumption.
 - In the background thread, the code runs in a continuous loop, until a cancellation is requested.
 - A cancellation can be requested by pressing any key while the program is running.
 
const string Topic = "input-topic";
// Configure the consumer
var config = new ConsumerConfig
{
	BootstrapServers = "localhost:9092",
	GroupId = "consumer-group-1",
	AutoOffsetReset = AutoOffsetReset.Earliest,
	EnableAutoCommit = false // Disable auto commit to have more control over offsets
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
	// Subscribe to the topic
	consumer.Subscribe(Topic);
	Console.WriteLine($"Consuming messages from topic: {Topic}");
	// Start consuming messages in a background thread
	var cancellationTokenSource = new CancellationTokenSource();
	var cancellationToken = cancellationTokenSource.Token;
	var consumerThread = new Thread(() =>
	{
		try
		{
			while (true)
			{
				try
				{
					var consumeResult = 
                          consumer.Consume(cancellationToken);
					Console.WriteLine($"Received message: 
                          {consumeResult.Message.Value} from partition 
                          {consumeResult.Partition} offset 
                          {consumeResult.Offset}";
					// Manually commit the offset to mark the message as consumed
					consumer.Commit(consumeResult);
				}
				catch (ConsumeException ex)
				{
					Console.WriteLine($"Error occurred: 
                        {ex.Error.Reason}");
				}
			}
		}
		catch (OperationCanceledException)
		{
			// This exception will be thrown when cancellation is requested.
			Console.WriteLine("Cancellation requested");
		}
		finally
		{
			consumer.Close();
		}
	});
	// Start the consumer thread
	consumerThread.Start();
	// Wait for a key press to exit
	Console.WriteLine("Press any key to exit");
	Console.ReadKey();
	// Request cancellation and wait for the consumer thread to stop
	cancellationTokenSource.Cancel();
	consumerThread.Join();
	Console.WriteLine("End Consumer");
}
What does offset do?
In this context, it tells the consumer where it left off.
Gotcha
While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.
Conclusion
In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!
