Enterprise Streaming: Building Real-Time Event Pipelines with Apache Kafka Streams & Java
Introduction
Modern enterprise architectures demand real-time telemetry and instantaneous data processing. Batch processing overnight is no longer sufficient when customer experience and fraud detection require sub-second reactions.
Before diving into the code, let's present our case study. We need to model the systems of bitVizor, a global real-estate exchange platform. bitVizor is migrating its rigid, point-to-point IT infrastructure to an Enterprise Service Bus (ESB) built on Apache Kafka. Their goal is to decouple microservices and unify the event backbone across the organization, supporting worldwide web and mobile clients where real-time response is fundamental.
A critical requirement from bitVizor's marketing department is to feed real-time user tracking data into their analytics engine. However, the raw clickstream data only contains IP addresses. The data pipeline must enrich this stream with geographic location data using third-party databases before it reaches the data lake.
Target Architecture
Below is the Stream Processing topology we will build. It consumes raw tracking events, extracts the IP, queries a local MaxMind GeoIP database, and publishes the enriched event to a downstream topic.
1. Setting Up the Modern Java Project
We will build our project using modern standards: Java 21, Gradle 8.x, and Kafka 3.7.x.
First, ensure you have Java 21 installed. Then, set up Gradle. If you don't have it, you can install it via SDKMAN (sdk install gradle) or Homebrew on macOS (brew install gradle).
Create your project directory:
mkdir eventprocessor && cd eventprocessor
gradle init --type java-library --dsl groovy --test-framework junit-jupiter
Gradle generates a skeleton project. You can safely delete the default Library.java and LibraryTest.java files generated in the src/main/java/... directories to make way for our streaming code.
Configuring Dependencies
Open the generated build.gradle file. We need the Kafka Streams API, Jackson for JSON parsing, and the MaxMind GeoIP2 library. Update the dependencies block:
dependencies {
// Apache Kafka Core & Streams API (Modern 3.7.x)
implementation 'org.apache.kafka:kafka-clients:3.7.0'
implementation 'org.apache.kafka:kafka-streams:3.7.0'
// JSON Serialization
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1'
// MaxMind GeoIP
implementation 'com.maxmind.geoip2:geoip2:4.0.1'
// Slf4j for Kafka Streams logging
implementation 'org.slf4j:slf4j-simple:2.0.12'
}
Run gradle build to pull down the dependencies and verify your build configuration.
2. Defining the Data Models
Our pipeline transforms a raw TrackingEvent into an EnrichedTrackingEvent. Let's create these POJOs.
In src/main/java/eventprocessor/models/, create TrackingEvent.java:
package eventprocessor.models;
public class TrackingEvent {
private String userId;
private String eventType;
private String ipAddress;
private Long timestamp;
// Getters and Setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public String getIpAddress() { return ipAddress; }
public void setIpAddress(String ipAddress) { this.ipAddress = ipAddress; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}
Next, create EnrichedTrackingEvent.java which extends the base event with location data:
package eventprocessor.models;
public class EnrichedTrackingEvent extends TrackingEvent {
private String country;
private String city;
// Getters and Setters
public String getCountry() { return country; }
public void setCountry(String country) { this.country = country; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
}
3. Creating Custom JSON SerDes
Kafka transmits data as raw byte arrays. We need a Serializer and Deserializer (SerDe) to convert our Java objects to bytes (JSON) and back.
In src/main/java/eventprocessor/serdes/, create JsonSerializer.java:
package eventprocessor.serdes;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
}
Create JsonDeserializer.java:
package eventprocessor.serdes;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> targetClass;
public JsonDeserializer(Class<T> targetClass) {
this.targetClass = targetClass;
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return objectMapper.readValue(data, targetClass);
} catch (Exception e) {
throw new SerializationException("Error deserializing JSON message", e);
}
}
}
4. The Geographic Enrichment Service
We will extract the ipAddress field from the incoming payload and query a local MaxMind database to map it to a physical city and country.
Note: Download the free MaxMind
GeoLite2-City.mmdbdatabase and place it in/tmp/or update the path in the code below.
Create GeoIPService.java in src/main/java/eventprocessor/services/:
package eventprocessor.services;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
public class GeoIPService {
private final DatabaseReader reader;
public GeoIPService(String dbPath) throws IOException {
File database = new File(dbPath);
this.reader = new DatabaseReader.Builder(database).build();
}
public CityResponse getLocation(String ipAddress) {
if (ipAddress == null || ipAddress.isEmpty()) return null;
try {
InetAddress inetAddress = InetAddress.getByName(ipAddress);
return reader.city(inetAddress);
} catch (Exception ex) {
// Ignore unresolvable or internal IPs
return null;
}
}
}
5. Building the Stream Topology
Now, we bring everything together into a Kafka Streams application. We define the input topic, apply a .mapValues() operator to perform the enrichment, and write the result to the output topic.
Create StreamProcessorApp.java in src/main/java/eventprocessor/:
package eventprocessor;
import eventprocessor.models.EnrichedTrackingEvent;
import eventprocessor.models.TrackingEvent;
import eventprocessor.serdes.JsonDeserializer;
import eventprocessor.serdes.JsonSerializer;
import eventprocessor.services.GeoIPService;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import com.maxmind.geoip2.model.CityResponse;
import java.util.Properties;
public class StreamProcessorApp {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "bitvizor-enrichment-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
GeoIPService geoService = new GeoIPService("/tmp/GeoLite2-City.mmdb");
// Define SerDes
Serde<String> stringSerde = Serdes.String();
Serde<TrackingEvent> inputSerde = Serdes.serdeFrom(
new JsonSerializer<>(), new JsonDeserializer<>(TrackingEvent.class)
);
Serde<EnrichedTrackingEvent> outputSerde = Serdes.serdeFrom(
new JsonSerializer<>(), new JsonDeserializer<>(EnrichedTrackingEvent.class)
);
StreamsBuilder builder = new StreamsBuilder();
// 1. Consume from source topic
KStream<String, TrackingEvent> sourceStream = builder.stream(
"raw_tracking_events",
Consumed.with(stringSerde, inputSerde)
);
// 2. Process and Enrich
KStream<String, EnrichedTrackingEvent> enrichedStream = sourceStream.mapValues(event -> {
EnrichedTrackingEvent enriched = new EnrichedTrackingEvent();
enriched.setUserId(event.getUserId());
enriched.setEventType(event.getEventType());
enriched.setIpAddress(event.getIpAddress());
enriched.setTimestamp(event.getTimestamp());
// Lookup Geography
CityResponse location = geoService.getLocation(event.getIpAddress());
if (location != null) {
enriched.setCountry(location.getCountry().getName());
enriched.setCity(location.getCity().getName());
} else {
enriched.setCountry("Unknown");
enriched.setCity("Unknown");
}
return enriched;
});
// 3. Produce to sink topic
enrichedStream.to(
"enriched_tracking_events",
Produced.with(stringSerde, outputSerde)
);
// Build and Start Topology
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
System.out.println("Starting Streams App...");
System.out.println(topology.describe());
streams.start();
// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
6. Testing the Pipeline
To run this, you need a local Kafka cluster. If you have Docker, you can spin one up quickly using the official Confluence images.
1. Create the Topics
Use the Kafka command-line tools to create your input and output topics:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic raw_tracking_events --partitions 3
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic enriched_tracking_events --partitions 3
2. Start the Application
Compile and run the Java application we just built. It will connect to localhost:9092 and sit idle waiting for data.
3. Produce Test Data
Open a terminal and start the console producer:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic raw_tracking_events --property "parse.key=true" --property "key.separator=:"
Paste in a sample event (using a known IP, like a Google DNS IP 8.8.8.8 which maps to the US):
user123:{"userId":"user123","eventType":"page_view","ipAddress":"8.8.8.8","timestamp":1706784000000}
4. Consume the Results
In another terminal, watch the output topic:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic enriched_tracking_events --from-beginning --property "print.key=true" --property "key.separator=:"
You should instantly see the enriched event flow through the pipeline, looking something like this:
user123:{"userId":"user123","eventType":"page_view","ipAddress":"8.8.8.8","timestamp":1706784000000,"country":"United States","city":null}
Conclusion
We have completely modeled a fault-tolerant, horizontally scalable stream processing node. By leveraging Kafka Streams' .mapValues() function, we enriched raw telemetry with geographic data in real-time, completely decoupling the ingestion layer from the downstream analytics engine.
This is the foundation of modern, event-driven enterprise service buses. In upcoming articles, we will explore stateful stream processing, windowing aggregations, and deploying these topologies to Kubernetes.