Designing event-driven microservices architectures using Apache Kafka and Kafka Streams

Dive into the world of event-driven microservices architecture with Apache Kafka and Kafka Streams. This comprehensive guide explores core concepts, implementation patterns, and best practices for building scalable distributed systems. Learn how to design event schemas, process streams effectively, and handle failures gracefully. With practical Java code examples and real-world architectural patterns, discover how companies like Netflix and LinkedIn process billions of events daily. Whether you're new to event-driven architecture or looking to optimize your existing system, this guide provides valuable insights into building robust, loosely coupled microservices.

GraphQL has a role beyond API Query Language- being the backbone of application Integration
background Coditation

Designing event-driven microservices architectures using Apache Kafka and Kafka Streams

Designing Event-Driven Microservices Architectures Using Apache Kafka and Kafka Streams

Event-driven architectures have become essential for building scalable, loosely coupled systems. Companies like Netflix, LinkedIn, and Uber process billions of events daily using event streaming platforms. Apache Kafka stands out as the leading platform for building event-driven microservices, handling over 100 trillion events per day across its user base.

Let's explore how to design and implement event-driven microservices using Apache Kafka and Kafka Streams through practical examples and architectural patterns.

Core Concepts of Event-Driven Microservices

Event-driven microservices communicate through events rather than direct API calls. An event represents a fact that has occurred in the system - for example, \"Order Created\" or \"Payment Processed\". This approach enables:

  • Loose coupling between services
  • Independent scaling of producers and consumers
  • Event replay and audit capabilities
  • Better fault tolerance

Apache Kafka as the Event Backbone

Kafka provides a distributed commit log that stores events in topics. Topics are partitioned for parallelism and replicated for fault tolerance. Key Kafka concepts include:

  • Topics: Categories for events
  • Partitions: Units of parallelism
  • Consumer Groups: Load balancing mechanism for consumers
  • Producers: Applications that publish events
  • Consumers: Applications that subscribe to events

Here's how to set up a basic Kafka producer in Java:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = 
    new ProducerRecord<>("orders", "order-123", "{'orderId': '123', 'amount': 99.99}");

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.println("Event published to partition " + metadata.partition());
    } else {
        exception.printStackTrace();
    }
});

producer.close();

And a corresponding consumer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
    }
}

Event Schema Design

Well-designed event schemas are crucial for maintainability. Apache Avro provides schema evolution capabilities and compact serialization. Here's an example Avro schema for an order event:

{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "items", "type": {"type": "array", "items": {
      "type": "record",
      "name": "OrderItem",
      "fields": [
        {"name": "productId", "type": "string"},
        {"name": "quantity", "type": "int"},
        {"name": "price", "type": "double"}
      ]
    }}}
  ]
}

Processing Events with Kafka Streams

Kafka Streams provides a powerful API for building stateful stream processing applications. Here's an example that calculates running totals per customer:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "customer-totals");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

builder.stream("orders", Consumed.with(Serdes.String(), orderAvroSerde))
    .groupBy((key, order) -> order.getCustomerId())
    .aggregate(
        () -> 0.0,
        (customerId, order, total) -> total + order.getAmount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    )
    .toStream()
    .to("customer-totals", Produced.with(Serdes.String(), Serdes.Double()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Event-Driven Patterns

Command Query Responsibility Segregation (CQRS)

CQRS separates read and write operations. Write operations emit events, while read operations consume events to build optimized view models:

// Command side
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody Order order) {
    String orderId = UUID.randomUUID().toString();
    OrderCreated event = OrderCreated.newBuilder()
        .setOrderId(orderId)
        .setCustomerId(order.getCustomerId())
        .setAmount(order.getAmount())
        .build();

    kafkaTemplate.send("orders", orderId, event);
    return ResponseEntity.ok(orderId);
}

// Query side
@Service
public class OrderViewUpdater {
    @KafkaListener(topics = "orders")
    public void handleOrderCreated(OrderCreated event) {
        OrderView view = new OrderView();
        view.setOrderId(event.getOrderId());
        view.setCustomerId(event.getCustomerId());
        view.setAmount(event.getAmount());
        orderViewRepository.save(view);
    }
}

Event Sourcing

Event sourcing stores the state of entities as a sequence of events. Here's an implementation using Kafka Streams:

public class OrderAggregate {
    private final KafkaStreams streams;
    private final String stateStoreName = "order-store";

    public OrderAggregate(Properties props) {
        StreamsBuilder builder = new StreamsBuilder();

        builder.stream("order-events", Consumed.with(Serdes.String(), eventAvroSerde))
            .groupByKey()
            .aggregate(
                Order::new,
                (orderId, event, order) -> order.apply(event),
                Materialized.as(stateStoreName)
            );

        streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

    public Order getOrder(String orderId) {
        ReadOnlyKeyValueStore<String, Order> store = 
            streams.store(stateStoreName, QueryableStoreTypes.keyValueStore());
        return store.get(orderId);
    }
}

Handling Failures and Retries

Reliable event processing requires proper error handling. Implement a Dead Letter Queue (DLQ) pattern:

@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler((exception, data) -> {
            String topic = "orders-dlq";
            kafkaTemplate.send(topic, data.value());
        });
        return factory;
    }
}

Monitoring and Observability

Implement metrics collection using Micrometer and Prometheus:

@Configuration
public class KafkaMetricsConfig {
    @Bean
    public MeterBinder kafkaConsumerMetrics() {
        return registry -> {
            registry.gauge("kafka.consumer.lag",
                Tags.of("group", "order-processing"),
                consumer,
                this::calculateConsumerLag);
        };
    }

    private double calculateConsumerLag(Consumer<?, ?> consumer) {
        Map<TopicPartition, Long> endOffsets = 
            consumer.endOffsets(consumer.assignment());

        return endOffsets.entrySet().stream()
            .mapToDouble(entry -> {
                TopicPartition partition = entry.getKey();
                long currentOffset = consumer.position(partition);
                return entry.getValue() - currentOffset;
            })
            .sum();
    }
}

Performance Optimization

Optimize throughput and latency:

  1. Configure appropriate partition counts:

    num.partitions=24
    

  2. Tune producer settings:

    batch.size=16384
    linger.ms=5
    compression.type=lz4
    

  3. Optimize consumer settings:

    fetch.min.bytes=1024
    fetch.max.wait.ms=500
    max.poll.records=500
    

Deployment Considerations

Deploy Kafka clusters using Kubernetes with Strimzi operator:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: event-streaming-cluster
spec:
  kafka:
    version: 3.3.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false

Event-driven microservices with Kafka provide scalability and flexibility for modern applications. Netflix processes over 8 trillion events per day using similar architectures. The patterns and practices outlined here form a foundation for building robust event-driven systems.

Remember to monitor performance metrics, implement proper error handling, and maintain well-documented event schemas. These practices ensure a maintainable and reliable event-driven architecture.

Want to receive update about our upcoming podcast?

Thanks for joining our newsletter.
Oops! Something went wrong.

Latest Articles

Designing multi-agent systems using LangGraph for collaborative problem-solving

Learn how to build sophisticated multi-agent systems using LangGraph for collaborative problem-solving. This comprehensive guide covers the implementation of a software development team of AI agents, including task breakdown, code implementation, and review processes. Discover practical patterns for state management, agent communication, error handling, and system monitoring. With real-world examples and code implementations, you'll understand how to orchestrate multiple AI agents to tackle complex problems effectively. Perfect for developers looking to create robust, production-grade multi-agent systems that can handle iterative development workflows and maintain reliable state management.

time
7
 min read

Implementing Custom Instrumentation for Application Performance Monitoring (APM) Using OpenTelemetry

Application Performance Monitoring (APM) has become crucial for businesses to ensure optimal software performance and user experience. As applications grow more complex and distributed, the need for comprehensive monitoring solutions has never been greater. OpenTelemetry has emerged as a powerful, vendor-neutral framework for instrumenting, generating, collecting, and exporting telemetry data. This article explores how to implement custom instrumentation using OpenTelemetry for effective APM.

Mobile Engineering
time
5
 min read

Implementing Custom Evaluation Metrics in LangChain for Measuring AI Agent Performance

As AI and language models continue to advance at breakneck speed, the need to accurately gauge AI agent performance has never been more critical. LangChain, a go-to framework for building language model applications, comes equipped with its own set of evaluation tools. However, these off-the-shelf solutions often fall short when dealing with the intricacies of specialized AI applications. This article dives into the world of custom evaluation metrics in LangChain, showing you how to craft bespoke measures that truly capture the essence of your AI agent's performance.

AI/ML
time
5
 min read