Skip to main content

Command Palette

Search for a command to run...

A Practical Journey from Application to Distributed Systems - Part 5

Introducing Kafka/Redpanda + move to event-driven workflow

Published
25 min read
A Practical Journey from Application to Distributed Systems - Part 5
D
I'm a mobile/web developer 👨‍💻 who loves to build projects and share valuable tips for programmers Follow me for Flutter, React/Next.js, and other awesome tech-related stuff 😉

In Part 4, we made Orders call Inventory directly using gRPC. That gave us a working flow, but it also showed a common problem like Orders has to wait for Inventory every time. If Inventory is slow, Orders becomes slow. If Inventory is down, Orders fails too.

In this part, we improve that design by introducing Kafka. We will use Redpanda locally since it is Kafka-compatible and simple to run. We will understand what Kafka and Redpanda is.

Instead of making Orders wait for Inventory, we will switch to an event-driven flow. So, Orders will publish an event, Inventory will consume it, and then Inventory will publish the result.

We keep the gRPC server running in Inventory because it is still useful for learning and future steps, but in Part 5 the main workflow is Kafka-based.S

This is the point where the project starts to feel like a real distributed system.


What we are building in this part

In this part, we will change the order flow from synchronous to asynchronous.

Before (Part 4)

The flow looked like this:

  • Orders creates the order as PENDING

  • Orders calls Inventory directly using gRPC

  • Orders immediately updates the order to CONFIRMED or CANCELLED

That means Orders has to wait for Inventory before it can finish the request.

After (Part 5)

Now the flow will look like this:

  • Orders creates the order as PENDING

  • Orders publishes an event to Kafka: OrderCreated

  • Inventory consumes that event and makes a stock decision

  • Inventory publishes a result event: InventoryReserved or InventoryRejected

  • Orders consumes the result event and updates the order status

So instead of waiting directly for Inventory, Orders hands off the work through Kafka.


What Kafka is

Kafka is a system for sending and storing messages, often called events.

Instead of one service calling another service directly and waiting for a response, a service can publish an event to Kafka. Other services can then read that event and react to it.

You can think of Kafka as a shared event pipeline between services.

Some simple terms to understand

Topic

A topic is a named stream of events.

Example:

orders.v1

If the Orders service publishes an OrderCreated event, it might send it to the orders.v1 topic.

You can think of a topic like a category or channel where related messages are stored.

Producer

A producer is a service that publishes messages to a topic.

For example:

  • Orders can be a producer when it publishes OrderCreated

Consumer

A consumer is a service that reads messages from a topic.

For example:

  • Inventory can be a consumer when it reads OrderCreated

Consumer group

A consumer group allows multiple consumers to share the work of reading messages.

This is useful when you want to scale processing across multiple instances of the same service.

You can think of a consumer group as a team of workers splitting the same queue.

Partition

A topic is split into partitions.

Partitions are what allow Kafka to scale. Instead of one big single stream, a topic can be divided into multiple smaller streams that can be processed in parallel.

So:

  • one topic

  • multiple partitions

  • more throughput and more scaling

Offset

An offset is the position of a message inside a partition.

You can think of it like a message number.

So if a consumer reads up to offset 42, it means it has processed messages up to that point in that partition.

Why Kafka is useful

One of Kafka’s biggest strengths is that consumers can crash, restart, and continue from where they left off. That means the system is more resilient. A service does not have to stay alive all the time to avoid losing work.

This is one of the reasons Kafka is so useful in distributed systems. In our case, Kafka lets us stop making Orders wait directly on Inventory.

Instead:

  • Orders publishes an event

  • Inventory reads it and processes it

  • Inventory publishes the result

  • Orders reads that result later

That makes the system more asynchronous and less tightly coupled.


Why Redpanda?

For this project, we will use Redpanda locally.

Redpanda is Kafka-compatible, which means our code can talk to it the same way it would talk to Kafka. The big advantage is that Redpanda is much easier to run in local development. We can start it as a single container, without needing a more complex setup.

It also comes with a command-line tool called rpk, which is very useful for:

  • creating topics

  • checking cluster state

  • debugging message flow

That makes it a great choice for learning and local experimentation.

So for this project, Redpanda gives us the Kafka model and APIs we want, but with a much simpler developer experience.


Step 1: Add Redpanda to Docker Compose

In your root docker-compose.yml, add this service:

  redpanda:
    image: redpandadata/redpanda:latest
    healthcheck:
      test: ["CMD-SHELL", "curl -fsS http://127.0.0.1:9644/v1/status/ready >/dev/null"]
      interval: 5s
      timeout: 5s
      retries: 12
      start_period: 30s
    command:
      - redpanda
      - start
      - --overprovisioned
      - --smp
      - "1"
      - --memory
      - 1G
      - --reserve-memory
      - 0M
      - --node-id
      - "0"
      - --check=false
      - --kafka-addr
      - internal://0.0.0.0:9092,external://0.0.0.0:19092
      - --advertise-kafka-addr
      - internal://redpanda:9092,external://localhost:19092
    ports:
      - "19092:19092"
      - "9644:9644"

This adds Redpanda as a local Kafka-compatible broker for our project.

What this service does

This container runs Redpanda, which will act as our event system.

It will store topics such as:

  • orders.v1

  • inventory.v1

and later our services will publish and consume events through it.

You might ask Why there are so many command options?

Don't worry, most of these options are there to make Redpanda easier to run on a local machine.

For example:

  • --overprovisioned

  • --smp "1"

  • --memory 1G

  • --reserve-memory 0M

  • --check=false

These settings reduce the resource requirements and make Redpanda friendlier for local development.

You do not need to memorize all of them. The main thing to understand is that they help us run a lightweight single-node Redpanda setup on our laptop.

Why we configure both internal and external addresses

This is the most important part of the setup. Redpanda needs to be reachable from two different places:

1. From inside Docker Compose

Other containers, like Orders and Inventory, need to connect to Redpanda using the Docker service name:

redpanda:9092

That is the internal address.

2. From your laptop

If you want to connect from your machine using tools like rpk, you cannot use the Docker service name redpanda.

From your laptop, you need to connect using:

localhost:19092

That is the external address.

So here we are saying:

  • containers should use redpanda:9092

  • your laptop should use localhost:19092

It was all configuration part, so you don't need to memorize it, just understand what it is doing and why. That's it.


Step 2: Create topics

First, start the containers:

docker compose up -d --build

Once everything is running, create the Kafka topics:

docker compose exec redpanda rpk topic create orders.v1 -p 3
docker compose exec redpanda rpk topic create inventory.v1 -p 3

These commands create two topics:

  • orders.v1

    • This topic will carry events produced by the Orders service.

    • For example:

      • OrderCreated

      Inventory will later consume events from this topic.

  • inventory.v1

    • This topic will carry events produced by the Inventory service.

    • For example:

      • InventoryReserved

      • InventoryRejected

      Orders will later consume these result events.

To check if the topics are created successfully run below command:

What are partitions?

A Kafka topic is split into partitions. You can think of a partition as one separate ordered lane of messages inside a topic.

So when we create a topic with:

rpk topic create orders.v1 -p 3

we are saying:

create the orders.v1 topic with 3 partitions

That gives us a few benefits.

Parallel processing: Multiple consumers can process different partitions at the same time, which helps the system scale.

Better throughput: Instead of all messages going through one single lane, Kafka can spread work across multiple partitions.

Ordering by key: If we publish messages with the same key, such as orderId, Kafka will consistently send those messages to the same partition.

That is useful because Kafka guarantees ordering within a partition. So if all events for order 123 use the same key, Kafka keeps those events in order for that order.


Step 3: Add Kafka library in Go services

To talk to Redpanda from Go, we need a Kafka client library.

For this project, we will use kafka-go, which is a popular Go library for working with Kafka-compatible systems.

We need to add it to both services, because:

  • Orders will publish events

  • Inventory will consume events and publish results

Add it to Orders:

cd services/orders
go get github.com/segmentio/kafka-go
go mod tidy
cd ../..

Add it to Inventory:

cd services/inventory
go get github.com/segmentio/kafka-go
go mod tidy
cd ../..

This adds the kafka-go library as a dependency in the service’s go.mod file. That means the service can now use Kafka producers and consumers in Go code.

You might ask why to add it for both the service. At first, it might seem like only one service needs Kafka, but both do.

Orders will use Kafka to:

  • publish OrderCreated events

  • later consume inventory result events

Inventory will use Kafka to:

  • consume OrderCreated

  • publish InventoryReserved or InventoryRejected

So both services need Kafka support.


Step 4: Define event formats (simple JSON for now)

In a production system, you could also use protobuf for Kafka events. But while learning, JSON is easier to work with because you can print the payload directly and understand what is being sent.

So for now, we will keep the event format simple and use JSON.

We will use two main kinds of events:

  • OrderCreated

  • InventoryReserved / InventoryRejected

OrderCreated

This event will be published by the Orders service when a new order is created.

It contains:

  • orderId - which order this event belongs to

  • userId - which user placed the order

  • sku - which product is being ordered, such as burger

  • quantity - how many items are requested

  • eventId - a unique ID for this event

  • time - when the event was created

This event tells the rest of the system:

“A new order was created, and inventory should now decide whether stock can be reserved.”

InventoryReserved / InventoryRejected

These events will be published by the Inventory service after it processes an order.

They contain:

  • orderId → which order this result belongs to

  • reservedtrue if stock was reserved, false if it was rejected

  • reason → an optional explanation, usually used when reservation fails

  • eventId → a unique ID for this event

  • time → when the result event was created

So these events tell Orders:

  • whether stock reservation succeeded

  • and, if it failed, why

The eventId is important because later we will use it for idempotency and de-duplication. In event-driven systems, the same message can sometimes be delivered more than once. A unique event ID helps the service recognize whether it has already processed that event before.


Step 5: Orders publishes OrderCreated instead of calling gRPC

This is the first major design change in this part. Until now, Orders was calling Inventory directly using gRPC. Now we are changing that flow. Instead of waiting for Inventory immediately, Orders will publish an event to Kafka.

That means Orders will no longer ask:

“Can you reserve stock right now?”

Instead, it will say:

“An order was created. Someone should process it.”

That “someone” will be the Inventory service.

5.1 Create a Kafka “bus” in Orders

To keep Kafka setup in one place, we will create a small helper package.

Create: services/orders/internal/kafkabus/bus.go

package kafkabus

import (
	"strings"

	"github.com/segmentio/kafka-go"
)

// This struct holds the Kafka clients that Orders needs.
type Bus struct {
    // This is the Kafka producer for Orders.
	OrdersWriter    *kafka.Writer
    // This is the Kafka consumer for Orders.
	InventoryReader *kafka.Reader
}

// This creates the Kafka writer and reader.
func New(brokers []string) *Bus {
	return &Bus{
        /*
            1. connect to the given Kafka brokers
            2. publish messages to orders.v1
            3. use kafka.Hash{} to choose partitions based on message key
        */
		OrdersWriter: &kafka.Writer{
			Addr:                   kafka.TCP(brokers...),
			Topic:                  "orders.v1",
			Balancer:               &kafka.Hash{},
			AllowAutoTopicCreation: true,
		},
        
        // This creates a Kafka consumer for inventory.v1.
		InventoryReader: kafka.NewReader(kafka.ReaderConfig{
			Brokers:  brokers,
			Topic:    "inventory.v1",
            // This is the most important part:
            // This means the Orders service reads inventory results as part of the consumer group named orders-service.
			GroupID:  "orders-service",
			MinBytes: 1,
			MaxBytes: 10e6,
		}),
	}
}

// This closes both the reader and the writer when the Orders service shuts down.
func (b *Bus) Close() error {
	_ = b.InventoryReader.Close()
	return b.OrdersWriter.Close()
}

// This helper takes a comma-separated string of broker addresses and turns it into a Go slice.
func BrokersFromEnv(s string) []string {
	parts := strings.Split(s, ",")
	out := make([]string, 0, len(parts))
	for _, p := range parts {
		p = strings.TrimSpace(p)
		if p != "" {
			out = append(out, p)
		}
	}
	return out
}

What this file is doing

This file creates a small Kafka wrapper for the Orders service.

It gives Orders two things:

  • a writer for publishing order events

  • a reader for consuming inventory result events

So instead of putting Kafka setup directly inside main.go or inside HTTP handlers, we keep it in one dedicated place.

That makes the code cleaner and easier to extend later.

5.2 Define events and publish function

Now we need to define the actual event structures that Orders will use when talking through Kafka.

Create: services/orders/internal/kafkabus/events.go

package kafkabus

import (
	"context"
	"encoding/json"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
)

// This is the event that the Orders service will publish when a new order is created.
type OrderCreated struct {
	EventID  string    `json:"eventId"`
	Type     string    `json:"type"`
	Time     time.Time `json:"time"`
	OrderID  int64     `json:"orderId"`
	UserID   string    `json:"userId"`
	Sku      string    `json:"sku"`
	Quantity int32     `json:"quantity"`
}

// This is the event that Inventory will publish after processing the order.
type InventoryResult struct {
	EventID   string    `json:"eventId"`
	Type      string    `json:"type"`
	Time      time.Time `json:"time"`
	OrderID   int64     `json:"orderId"`
	Reserved  bool      `json:"reserved"`
    // The omitempty tag on Reason means that if the reason is empty, it will be left out of the JSON.
	Reason    string    `json:"reason,omitempty"`
}

// This helper takes an OrderCreated struct and publishes it to Kafka.
func PublishOrderCreated(ctx context.Context, w *kafka.Writer, evt OrderCreated) error {
    // Since we are using JSON for events right now, this turns the Go struct into a JSON byte payload.
	val, err := json.Marshal(evt)
	if err != nil {
		return err
	}

    // This writes the event into Kafka.
	return w.WriteMessages(ctx, kafka.Message{
		Key:   []byte(strconv.FormatInt(evt.OrderID, 10)),
		Value: val,
	})
}

This file gives us two things:

  • the Go structs that represent our Kafka events

  • a helper function PublishOrderCreated to publish an OrderCreated event

So instead of building Kafka messages manually all over the codebase, we define the event format once and reuse it.

5.3 Update Orders HTTP handler: create order + publish event

In this part of the blog series, we are keeping the flow simple.

For now, when a new order is created, we will:

  • create the order in Postgres as PENDING

  • publish an OrderCreated event to Kafka

  • return 202 Accepted

Later, when we introduce the Outbox pattern, we will make this flow safer even if Kafka is temporarily down. But for now, this version is enough to teach the event-driven flow clearly.

Open:services/orders/internal/http/http.go

Since the HTTP handler now needs to publish Kafka events, the Orders HTTP server needs access to the Kafka bus.

Update the Server struct like this:

type Server struct {
	store *store.OrdersStore
	inv   *inv.Client
	bus   *kafkabus.Bus
}

func NewServer(store *store.OrdersStore, inv *inv.Client, bus *kafkabus.Bus) *Server {
	return &Server{store: store, inv: inv, bus: bus}
}

Earlier, the HTTP server only needed access to the database store. Now it also needs access to Kafka, because after creating an order it must publish an OrderCreated event.

So the server now depends on two things:

  • the store, for database operations

  • the bus, for Kafka operations

Now inside the handleCreateOrder function in http.go After creating the order as PENDING, build the event like this:

// Create order as PENDING
// ...

evt := kafkabus.OrderCreated{
	EventID:  uuid.New().String(),
	Type:     "OrderCreated",
	Time:     time.Now().UTC(),
	OrderID:  id,
	UserID:   req.UserID,
	Sku:      req.Sku,
	Quantity: int32(req.Quantity),
}

This creates the event payload that will be sent to Kafka.

It tells the rest of the system:

“A new order was created, and Inventory should now process it.”

Now publish it using the Kafka bus:


if err := kafkabus.PublishOrderCreated(ctx, s.bus.OrdersWriter, evt); err != nil {
	http.Error(w, "kafka publish failed", http.StatusServiceUnavailable)
	return
}

writeJSON(w, http.StatusAccepted, createOrderResponse{ID: id})

If publishing fails, we return:

  • 503 Service Unavailable

That means the order was created in the database, but the event could not be published yet. Again, this is a simplification for now. Later, the Outbox pattern will solve this more safely.

Also, We use 202 Accepted instead of 201 Created because the full order processing is no longer finished immediately.

One important detail: publishing an event does not automatically update the order status. The status changes only if Inventory consumes the event and sends a result back, and if Orders is also consuming those result events.

That means we must start two background consumer loops in the next steps.

IMPORTANT: Make sure you comment out the previous code that we wrote in the Part 4 because no we are no longer updating status directly:

Step 6: Inventory consumes OrderCreated and publishes result

Now Inventory stops being just a gRPC server and starts acting as a Kafka consumer too.

That means Inventory will now:

  • read OrderCreated events from Kafka

  • try to reserve stock

  • publish a result event back to Kafka

So Inventory becomes the service that reacts to order events and decides whether stock can actually be reserved.

Let's now also create a Kafka bus helper for Inventory service.

6.1 Create a Kafka bus helper for Inventory

To keep the Kafka setup clean, we will create a small bus helper for the Inventory service.

Create: services/inventory/internal/kafkabus/bus.go

package kafkabus

import (
	"strings"

	"github.com/segmentio/kafka-go"
)

type Bus struct {
    // This reads from: order.v1
	OrdersReader     *kafka.Reader
    // This writes to: inventory.v1
	InventoryWriter  *kafka.Writer
}

func New(brokers []string) *Bus {
	return &Bus{
		OrdersReader: kafka.NewReader(kafka.ReaderConfig{
			Brokers:  brokers,
			Topic:    "orders.v1",
			GroupID:  "inventory-service",
			MinBytes: 1,
			MaxBytes: 10e6,
		}),
		InventoryWriter: &kafka.Writer{
			Addr:                   kafka.TCP(brokers...),
			Topic:                  "inventory.v1",
			Balancer:               &kafka.Hash{},
			AllowAutoTopicCreation: true,
		},
	}
}

func (b *Bus) Close() error {
	_ = b.OrdersReader.Close()
	return b.InventoryWriter.Close()
}

func BrokersFromEnv(s string) []string {
	parts := strings.Split(s, ",")
	out := make([]string, 0, len(parts))
	for _, p := range parts {
		p = strings.TrimSpace(p)
		if p != "" {
			out = append(out, p)
		}
	}
	return out
}

What this bus means

This Kafka bus gives Inventory two things:

  • a reader for consuming order events

  • a writer for publishing inventory result events

The OrdersReader reads from: orders.v1 . That is where the Orders service publishes OrderCreated events. The InventoryWriter writes to: inventory.v1. That is where Inventory will publish the result after checking stock.

6.2 Start the Inventory Kafka consumer loop

Inventory will run a background goroutine that continuously reads from orders.v1.

For each OrderCreated event, it will:

  1. decode the event

  2. try to reserve stock

  3. publish an inventory result event to inventory.v1

Create: services/inventory/internal/kafkabus/consume.go

package kafkabus

import (
	"context"
	"encoding/json"
	"log"

	"github.com/segmentio/kafka-go"
)

func ConsumeOrders(ctx context.Context, r *kafka.Reader, handle func(OrderCreated) error) {
	for {
		msg, err := r.FetchMessage(ctx)
		if err != nil {
			log.Printf("inventory consumer stopped: %v", err)
			return
		}

		var evt OrderCreated
		if err := json.Unmarshal(msg.Value, &evt); err != nil {
			log.Printf("bad order event json: %v", err)
			_ = r.CommitMessages(ctx, msg) // poison => commit and move on
			continue
		}

		if err := handle(evt); err != nil {
			log.Printf("handle order event failed: %v", err)
			// no commit => retry later
			continue
		}

		_ = r.CommitMessages(ctx, msg)
	}
}

What this is doing? This is the core Kafka consumer loop for Inventory. It keeps reading messages from orders.v1. If the JSON is bad, It logs the error, commits the message, and moves on. And if the handling succeed, It commits the message, which means Inventory is done with it.

6.3 Define the event structs and publish helper

Now we define the event structures Inventory will use.

Create services/inventory/internal/kafkabus/events.go

package kafkabus

import (
	"context"
	"encoding/json"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
)

type OrderCreated struct {
	EventID   string    `json:"eventId"`
	Type      string    `json:"type"`
	Time      time.Time `json:"time"`
	OrderID   int64     `json:"orderId"`
	UserID    string    `json:"userId"`
	Sku       string    `json:"sku"`
	Quantity  int32     `json:"quantity"`
}

type InventoryResult struct {
	EventID  string    `json:"eventId"`
	Type     string    `json:"type"` // InventoryReserved | InventoryRejected
	Time     time.Time `json:"time"`
	OrderID  int64     `json:"orderId"`
	Reserved bool      `json:"reserved"`
	Reason   string    `json:"reason,omitempty"`
}

func PublishInventoryResult(ctx context.Context, w *kafka.Writer, evt InventoryResult) error {
	val, err := json.Marshal(evt)
	if err != nil {
		return err
	}
	return w.WriteMessages(ctx, kafka.Message{
		Key:   []byte(strconv.FormatInt(evt.OrderID, 10)),
		Value: val,
	})
}

These structs define the JSON payloads Inventory reads and writes.

OrderCreated

This is the event Inventory receives from Orders.

InventoryResult

This is the event Inventory publishes after checking stock.

Using orderId as the Kafka key is important because it helps keep all events for the same order in the same partition, which preserves ordering for that order.

6.4 Update Inventory main.go

Now wire Kafka into the Inventory service.

First, add a helper method outside main():

// outside of main
func (s *inventoryServer) tryReserve(sku string, qty int32) error {
	if sku == "" {
		return errors.New("sku is required")
	}
	if qty <= 0 {
		return errors.New("quantity must be > 0")
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	available := s.stock[sku]
	if available < qty {
		return errors.New("insufficient stock")
	}

	s.stock[sku] = available - qty
	return nil
}

This reuses the same stock-reservation logic, but now for Kafka events instead of only gRPC requests.

Then inside main():

brokers := kafkabus.BrokersFromEnv(getenv("KAFKA_BROKERS", "localhost:19092"))
bus := kafkabus.New(brokers)
defer bus.Close()

srv := newInventoryServer()
inventoryv1.RegisterInventoryServiceServer(grpcServer, srv)

ctx := context.Background()

go kafkabus.ConsumeOrders(ctx, bus.OrdersReader, func(evt kafkabus.OrderCreated) error {
	err := srv.tryReserve(evt.Sku, evt.Quantity)

	out := kafkabus.InventoryResult{
		EventID: uuid.New().String(),
		Time:    time.Now().UTC(),
		OrderID: evt.OrderID,
	}

	if err != nil {
		out.Type = "InventoryRejected"
		out.Reason = err.Error()
		out.Reserved = false
	} else {
		out.Type = "InventoryReserved"
		out.Reserved = true
	}

	return kafkabus.PublishInventoryResult(ctx, bus.InventoryWriter, out)
})

This is where Inventory becomes part of the async workflow. First, it connects to Kafka It reads broker addresses from KAFKA_BROKERS, creates the Kafka bus, and keeps it open for the lifetime of the service.

Then, it starts a background goroutine. That goroutine continuously consumes OrderCreated events from orders.v1.

For each event, Inventory:

  • checks stock using tryReserve

  • creates an InventoryResult

  • publishes that result to inventory.v1

If stock is available:

  • type = InventoryReserved

  • reserved = true

If stock is not available:

  • type = InventoryRejected

  • reserved = false

  • reason = error message

Step 7: Orders consumes Inventory results and updates status

Now we complete the async loop.

Inventory is already publishing result events into inventory.v1, but that alone does not change anything in the Orders database. Orders must also read those result events and update the stored order status.

So Orders will now start a background goroutine that continuously reads messages from inventory.v1

These messages are the result events published by Inventory after it processes an order.

For each InventoryResult, Orders will update the order status in the database:

  • if reserved is true → set status to CONFIRMED

  • if reserved is false → set status to CANCELLED

At this point, the flow is split into two stages.

Stage 1

Orders accepts the request, stores the order as PENDING, and publishes an event.

Stage 2

Later, after Inventory processes the event, Orders receives the result and updates the final order status.

So the final state change now happens in the background, not during the original HTTP request.

This is the async update pattern in action.

Create services/orders/internal/kafkabus/consume.go to consume inventory.v1 continuously.

package kafkabus

import (
	"context"
	"encoding/json"
	"log"

	"github.com/segmentio/kafka-go"
)

// ConsumeInventoryResults reads messages from inventory.v1 and passes them to handle().
// If handle returns error, we do not commit, so Kafka will retry.
func ConsumeInventoryResults(ctx context.Context, r *kafka.Reader, handle func(InventoryResult) error) {
	for {
		msg, err := r.FetchMessage(ctx)
		if err != nil {
			log.Printf("orders: inventory consumer stopped: %v", err)
			return
		}

		var evt InventoryResult
		if err := json.Unmarshal(msg.Value, &evt); err != nil {
			log.Printf("orders: bad inventory event json: %v", err)
			_ = r.CommitMessages(ctx, msg) // poison => commit and move on
			continue
		}

		if err := handle(evt); err != nil {
			log.Printf("orders: handle inventory event failed: %v", err)
			continue // no commit => retry
		}

		_ = r.CommitMessages(ctx, msg)
	}
}

This loop keeps reading messages from inventory.v1. If the JSON is invalid It logs the error, commits the message, and moves on. That is because a poison message will not become valid just by retrying. If handling fails It does not commit the message. That means Kafka can deliver it again later.

If handling succeeds, It commits the message, which means Orders has processed it successfully. This gives us an at-least-once processing model.

Start the background consumer in main.go

Now inside services/orders/cmd/orders/main.go 's main() function, after creating the HTTP server, start the consumer goroutine:

ctx := context.Background()

go kafkabus.ConsumeInventoryResults(ctx, bus.InventoryReader, func(evt kafkabus.InventoryResult) error {
  switch evt.Type {
  case "InventoryReserved":
    return orderStore.UpdateStatus(ctx, evt.OrderID, store.StatusConfirmed)
  case "InventoryRejected":
    return orderStore.UpdateStatus(ctx, evt.OrderID, store.StatusCancelled)
  default:
    return nil
  }
})

This starts a background worker inside Orders. Whenever Inventory publishes a result event, Orders reads it and applies the final state change. If the result is InventoryReserved Orders updates the order to CONFIRMED. If the result is InventoryRejected Orders updates the order to CANCELLED.

So this is the final step that closes the async workflow.

Step 8: Update docker-compose

Now both services need to know how to reach Kafka.

Add KAFKA_BROKERS to the orders and inventory services in docker-compose.yml:

  orders:
    depends_on:
        ...
     redpanda:
       condition: service_healthy
    environment:
      ...
      KAFKA_BROKERS: "redpanda:9092"

  inventory:
    environment:
      ...
      KAFKA_BROKERS: "redpanda:9092"

Both services now use Kafka.


How to test

Now let’s test the new Kafka-based flow end to end.

Start fresh with:

docker compose down -v
docker compose up --build

This removes old containers and volumes, then rebuilds and starts the full system again.

Create an order:

Send a request to the Orders API:

curl -s -X POST localhost:8080/v1/orders \
  -H "Content-Type: application/json" \
  -d '{"userId":"u1","sku":"burger","quantity":2,"note":"kafka test"}'

If the request is accepted, you should get back an order ID.

Because the flow is now asynchronous, this does not mean the order is fully processed yet. It only means Orders accepted the request, stored the order, and published an event.

Now fetch the order:

curl -s localhost:8080/v1/orders/1

At first, you should see the order in: PENDING. Then, after Inventory consumes the event and publishes the result, the order should become: CONFIRMED or CANCELLED.

To see the flow happening across both services, run:

docker compose logs -f orders inventory

What we learned in Part 5

In this part, we introduced Kafka and changed the order flow from synchronous processing to an event-driven design.

We learned:

  • how Kafka topics and consumer groups work

  • how to publish events with a key

  • how to consume messages and commit offsets

  • why asynchronous processing leads to a PENDING state before the final result

  • how services become more independent and resilient when they communicate through events

This part is important because it changes how the system behaves. Orders no longer has to wait directly for Inventory. Instead, the two services communicate through Kafka, which makes the system more decoupled and better suited for scaling and failure handling.

It also introduces an important idea that shows up in many real systems: work is often accepted first, then completed later. That is why a PENDING state becomes a normal part of the design.


Conclusion

We now have our first event-driven order flow.

Orders no longer waits directly for Inventory. Instead, it accepts the request quickly and lets inventory processing happen asynchronously through Kafka. That makes the system feel much closer to how real distributed backends are designed.

At the same time, this version is not fully safe yet.

There is still an important failure case: if Kafka is down at the moment we create an order, the event can be lost. That can leave an order stuck in PENDING with no way to finish the workflow.

So in the next part, we will focus on reliability. We will move toward a Saga-style workflow and then introduce the Outbox pattern, which will make sure important events are not lost.