Following this post, you will be able to deploy, configure and use an Apache Kafka event streaming platform with Apache Zookeeper , for your integration and development environment easily.

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

1- Stack

  • Kubelet : v1.17.2 / v1.18.5
  • Kubectl : v1.17.1
  • Docker : 19.03.5 / 19.03.8
  • Zookeeper : 3.4.10
  • Kafka : 2.7.0 (Scala 2.13 / Glib 2.31-r0)
  • Kube namespace : kafka (if you use a different namespace, it must be changed in service and pod hostnames)
  • Architecture : AMD64 / ARM64
  • Python (optional, for client testing) : 3.8

2- Zookeeper deployment

First, deploy a small Zookeeper cluster (2 pods) using a StatefulSet and exposing it with 2 Services, one for client communication and another for Zookeeper cluster communication (leader election).

kubectl apply -f zookeeper/statefulset.yaml kubectl apply -f zookeeper/service.yaml

Next, you can test your deployment :

kubectl exec zk-0 zkCli.sh create /hello world kubectl exec zk-1 zkCli.sh get /hello

For more information, take a tour in the kubernetes blog .

3- Consumer/Producer application case :

You need to deploy a Kafka broker with ZooKeeper as synchronized services :

  1. Create 2 Kafka broker with StatefulSet
  2. Create first topic (k8s for example), you can use one of available broker hostname or the broker service hostname :

- kafka-0.kafka-broker.kafka.svc.cluster.local

- kafka-1.kafka-broker.kafka.svc.cluster.local

- kafka-broker.kafka.svc.cluster.local

Next, create the first topic and run the first consumer client to check configuration.

kubectl apply -f service.yaml kubectl apply -f statefulset.yaml kubectl exec -ti kafka-0 -- kafka-topics.sh --create --topic=k8s --bootstrap-server kafka-0.kafka-broker.kafka.svc.cluster.local:9092 
kubectl apply -f consumer.yaml kubectl logs consumer

4- Development case (from Workstation with kubectl)

You need to create a custom broker (for host binding) and activate a port forwarding to your workstation, and finally create a development topic :

kubectl apply -f dev-brocker.yaml kubectl port-forward pod/dev-brocker 9092:9092 
kubectl exec -ti dev-brocker -- kafka-topics.sh --create --topic dev-k8s --bootstrap-server 127.0.0.1:9092
  • Running python consumer and producer :
pip install kafka-python python ../client/Consumer.py python ../client/Producer.py
  • Using Kafka help script client
kubectl exec -ti dev-brocker -- kafka-console-producer.sh --topic=dev-k8s --bootstrap-server 127.0.0.1:9092 
>> Hello World!
>> I'm a Producer
kubectl exec -ti dev-brocker -- kafka-console-consumer.sh --topic=k8s --from-beginning --bootstrap-server 127.0.0.1:9092
<< Hello World!
<< I'm a Producer

5- Secure your Kafka

With a standard Kafka setup, any user or application can write any messages to any topic. It's the same for Zookeeper. So, we need to add a DIGEST authentication layer to Zookeeper (doesn’t support ACL, but we have only Kafka broker as client, DIGEST is sufficient) to authorize only Kafka broker. In Kafka Side we need to add SSL authentication to authorize ony valid client to use services.

Follow the security section documentation

6- Sourcing

- Add JVM flags to be injected in Java environment file @see start-zookeeper.sh

echo "JVMFLAGS=\"-Xmx$HEAP -Xms$HEAP $JVMFLAGS\"" >> $JAVA_ENV_FILE

- Extra configuration file path, to be injected in top on configuration file @see start-zookeeper.sh

#Add extra configuration from file (file path in env : EXTRA_CONFIG_FILE) 
if [[ -n "$EXTRA_CONFIG_FILE" ]]; then    
    echo "#Start extra-section" >> $CONFIG_FILE    
    cat $EXTRA_CONFIG_FILE >> $CONFIG_FILE    
    echo "#End of extra-section" >> $CONFIG_FILE
fi

- For ARM64 arch, switching base image from 'openjdk:8u212-jre-alpine' to 'openjdk:8u201-jre-alpine' to prevent container core dump @see issue.

- For K8S deployment, add a 'KAFKA_LISTENERS_COMMAND' environment parameter to build 'KAFKA_LISTENERS' on fly (to use pod hostname when container started) @see start-kafka.sh

if [[ -n "$KAFKA_LISTENERS_COMMAND" ]]; then
    KAFKA_LISTENERS=$(eval "$KAFKA_LISTENERS_COMMAND")
    export KAFKA_LISTENERS
    unset KAFKA_LISTENERS_COMMAND
fi

7- Tips

  • For debugging, you can bypass the Kafka broker for topics management (kafka and ZooKeeper helpers script) :
kubectl exec -ti kafka-0 -- kafka-topics.sh --create --topic k8s --zookeeper zk-cs.kafka.svc.cluster.local:2181 
kubectl exec -ti kafka-0 -- kafka-topics.sh --describe --topic k8s --zookeeper zk-cs.kafka.svc.cluster.local:2181 
kubectl exec zk-1 zkCli.sh ls /brokers/topics
  • Building multi-architecture docker image :
docker buildx build --push --platform linux/arm64/v8,linux/amd64 --tag [medinvention]/kubernetes-zookeeper:latest .
docker buildx build --push --platform linux/arm64/v8,linux/amd64 --tag [medinvention]/kafka:latest .

@GitHub source