Code Monkey home page Code Monkey logo

kubernetes-kafka's Introduction

Kafka for Kubernetes

This community seeks to provide:

  • Production-worthy Kafka setup for persistent (domain- and ops-) data at small scale.
  • Operational knowledge, biased towards resilience over throughput, as Kubernetes manifest.
  • A platform for event-driven (streaming!) microservices design using Kubernetes.

To quote @arthurk:

thanks for creating and maintaining this Kubernetes files, they're up-to-date (unlike the kubernetes contrib files, don't require helm and work great!

Getting started

We suggest you apply -f manifests in the following order:

That'll give you client "bootstrap" bootstrap.kafka.svc.cluster.local:9092.

Fork

Our only dependency is kubectl. Not because we dislike Helm or Operators, but because we think plain manifests make it easier to collaborate. If you begin to rely on this kafka setup we recommend you fork, for example to edit broker config.

Kustomize

With the introduction of app customization in kubectl 1.14 there's an alternative to forks. We as a community can maintain a set of overlays.

See the variants folder for different overlays. For example to scale to 1 kafka broker try kubectl apply -k variants/scale-1/. Variants also include examples of how to configure volumes for GKE, AWS and AKS with different storage classes.

Quickstart

kubectl create namespace kafka && \
kubectl apply -k github.com/Yolean/kubernetes-kafka/variants/dev-small/?ref=v6.0.3

When all pods are Ready, test with for example kafkacat -b localhost:9094 -L over kubectl -n kafka port-forward kafka-0 9094.

Maintaining your own kustomization

Start your variant as a new folder in your choice of version control, with a base kustomization.yaml pointing to a tag or revision in this repository:

bases:
- github.com/Yolean/kubernetes-kafka/rbac-namespace-default/?ref=60d01b5
- github.com/Yolean/kubernetes-kafka/kafka/?ref=60d01b5
- github.com/Yolean/kubernetes-kafka/zookeeper/?ref=60d01b5

Then pick and chose from patches our example variants to tailor your Kafka setup.

Version history

tag k8s โ‰ฅ highlights
v7.0.0 1.15+ Breaking with nonroot and native bases
v6.0.x 1.13+ Kafka 2.4.0 + standard storage class
v6.0.0 1.11+ Kafka 2.2.0 + apply -k (kubectl 1.14+) + #270
v5.1.0 1.11+ Kafka 2.1.1
v5.0.3 1.11+ Zookeeper fix #227 + maxClientCnxns=1
v5.0 1.11+ Destabilize because in Docker we want Java 11 #197 #191
v4.3.1 1.9+ Critical Zookeeper persistence fix #228
v4.3 1.9+ Adds a proper shutdown hook #207
v4.2 1.9+ Kafka 1.0.2 and tools upgrade
... see releases for full history ...
v1.0 1 Stateful? In Kubernetes? In 2016? Yes.

Monitoring

Have a look at:

Outside (out-of-cluster) access

Available for:

Stream...

kubernetes-kafka's People

Contributors

alzabo avatar atamon avatar coderroggie avatar coltfred avatar deadmoose avatar double16 avatar elm- avatar evie404 avatar gjacquet avatar goodhoko avatar hermain avatar hoffin avatar kahwai1984 avatar kmova avatar lamdor avatar lenadroid avatar sidps avatar solsson avatar stevekmin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kubernetes-kafka's Issues

Zookeeper pods unable to see each other

I am deploying the Zookeeper statefulset manifest and I see the following in the logs from the pods:

	at java.net.InetAddress.getByName(InetAddress.java:1076)
	at org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer.recreateSocketAddresses(QuorumPeer.java:166)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:595)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:614)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:843)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:913)
2017-06-16 02:29:03,164 [myid:1] - INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:FastLeaderElection@852] - Notification time out: 60000
2017-06-16 02:30:03,165 [myid:1] - WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@588] - Cannot open channel to 2 at election address api-pers-zoo-1.capi.svc.cluster.local:3888
java.net.UnknownHostException: api-pers-zoo-1.capi.svc.cluster.local
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:562)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:614)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:843)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:913)
2017-06-16 02:30:03,169 [myid:1] - WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumPeer$QuorumServer@173] - Failed to resolve address: api-pers-zoo-1.capi.svc.cluster.local
java.net.UnknownHostException: api-pers-zoo-1.capi.svc.cluster.local: Name does not resolve
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
	at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
	at java.net.InetAddress.getAllByName(InetAddress.java:1192)
	at java.net.InetAddress.getAllByName(InetAddress.java:1126)
	at java.net.InetAddress.getByName(InetAddress.java:1076)
	at org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer.recreateSocketAddresses(QuorumPeer.java:166)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:595)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:614)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:843)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:913)

I've updated the command in the statefulset manifest to provide the complete name of each zookeeper pod:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: api-pers-zoo
  namespace: capi
spec:
  serviceName: "api-pers-zoo"
  replicas: 5
  template:
    metadata:
      labels:
        app: api-pers-zookeeper
    spec:
      terminationGracePeriodSeconds: 10
      containers:
        - name: api-pers-zookeeper
          image: solsson/zookeeper-statefulset
          env:
            - name: ZOO_SERVERS
              value: server.1=api-pers-zoo-0.capi.svc.cluster.local:2888:3888:participant server.2=api-pers-zoo-1.capi.svc.cluster.local:2888:3888:participant server.3=api-pers-zoo-2.capi.svc.cluster.local:2888:3888:participant server.4=api-pers-zoo-3.capi.svc.cluster.local:2888:3888:participant server.5=api-pers-zoo-4.capi.svc.cluster.local:2888:3888:participant
          ports:
            - containerPort: 2181
              name: client
            - containerPort: 2888
              name: peer
            - containerPort: 3888
              name: leader-election
          volumeMounts:
            - name: datadir
              mountPath: /data
            # There's defaults in this folder, such as logging config
            #- name: conf
            #  mountPath: /conf
      volumes:
        #- name: conf
        #  emptyDir: {}
        - name: datadir
          emptyDir: {}

Services defined as follow:

apiVersion: v1
kind: Service
metadata:
  name: api-pers-zoo
  namespace: capi
spec:
  ports:
  - port: 2888
    name: peer
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: api-pers-zookeeper

and

# the headless service is for PetSet DNS, this one is for clients
apiVersion: v1
kind: Service
metadata:
  name: api-pers-zookeeper
  namespace: capi
spec:
  ports:
  - port: 2181
    name: client
  selector:
    app: api-pers-zookeeper

Any ideas where I am going wrong? Thanks!

Broker unresponsive due to NotEnoughReplicasException

One of our brokers was unresponsive, leading to timeouts in clients. Was busy in a loop that logged:

[2017-12-17 21:33:52,534] INFO [GroupCoordinator 1]: Preparing to rebalance group user-sessions-stream with old generation 779 (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 21:33:52,637] INFO [GroupCoordinator 1]: Stabilized group user-sessions-stream generation 780 (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 21:33:52,637] INFO [GroupCoordinator 1]: Assignment received from leader for group user-sessions-stream for generation 780 (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 21:33:52,638] ERROR [ReplicaManager broker=1] Error processing append operation on partition __consumer_offsets-32 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition __consumer_offsets-32 is [1], below required minimum [2]

Quite possibly due to the configuration change in #107.

Kafka fails to establish connection if server.properties has zookeeper.connect=zookeeper:2181

I have setup kafka & zookeeper , zookeeper came up fine and is running but kafka pods fail if config/server.properties has zookeeper.connect=zookeeper-svc:2181

IF i replace zookeerper:2181 with ip address of zookeeper pods then connection succeeds
config/server.properties has zookeeper.connect=10.0.40.0:2181,10.0.0.32:2181

**Problem :
I want to use podname:2181 or servicename:2181 in zookeeper.connect field.
Right now using zoo-pod-ip:port is hard coding on reboot the IP will be lost , can you please help in getting the issue resolved.
**

Error :
[2017-02-16 05:04:57,615] INFO Initiating client connection, connectString=zookeeper-svc:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@68999068 (org.apache.zookeeper.ZooKeeper)
[2017-02-16 05:04:57,627] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-02-16 05:05:06,628] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-02-16 05:05:07,641] INFO **Opening socket connection to server zookeeper-svc.default.svc.cluster.local/10.98.86.77:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)**
[2017-02-16 05:05:07,685] INFO Socket connection established to zookeeper-svc.default.svc.cluster.local/10.98.86.77:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-02-16 05:05:07,743] INFO Session establishment complete on server zookeeper.default.svc.cluster.local/10.98.86.77:2181, sessionid = 0x25a454d326e0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2017-02-16 05:05:07,747] INFO **Session: 0x25a454d326e0000 closed** (org.apache.zookeeper.ZooKeeper)
[2017-02-16 05:05:07,748] INFO EventThread shut down for session: 0x25a454d326e0000 (org.apache.zookeeper.ClientCnxn)
[2017-02-16 05:05:07,748] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.I0Itec.zkclient.exception.ZkTimeoutException: **Unable to connect to zookeeper server within timeou**t: 9000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
        at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
        at kafka.Kafka$.main(Kafka.scala:67)
        at kafka.Kafka.main(Kafka.scala)
[2017-02-16 05:05:07,750] INFO shutting down (kafka.server.KafkaServer)
[2017-02-16 05:05:07,753] INFO shut down completed (kafka.server.KafkaServer)
[2017-02-16 05:05:07,754] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 9000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
        at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
        at kafka.Kafka$.main(Kafka.scala:67)
        at kafka.Kafka.main(Kafka.scala)
[2017-02-16 05:05:07,755] INFO shutting down (kafka.server.KafkaServer)

Can't deploy kafka, zookeeper Insufficient memory and cpu

What are minimum specs?
I already have upgraded my agents to have 2 cores and 7gb ram, 50gb ssd, but it still drops me an error for zf-kafkaname-1:

No nodes are available that match all of the following predicates:: Insufficient cpu (1), Insufficient memory (1).

How can I fix this?

Dockerfiles

Hi @solsson - thanks for the nicely organized project! Do you have the Dockerfiles located somewhere for the broker and zookeeper that are being referenced for the images? I didn't see any links for them.

Thanks!

How to do dyanmic PVC claim ?

Added

  volumeClaimTemplates:
  - metadata:
      name: datadir
      annotations:
          volume.beta.kubernetes.io/storage-class: default
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 8Gi

to the 50kafka.yml file and commented out the 10pvc.yaml file but still getting an error saying

[SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-kafka-0", which is unexpected.]

I am unable to solve this particular issue, do you have any idea what I am doing wrong.

FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

Hi guys,
I am using the template but I have some problem during some specific condition:

On kubernetes if I restart the agents, once all are up&running my kafka pods are still crashing:

kubectl get pods --namespace kafka
NAME      READY     STATUS             RESTARTS   AGE
kafka-0   0/1       CrashLoopBackOff   12         2d
kafka-1   0/1       CrashLoopBackOff   11         2d
kafka-2   0/1       CrashLoopBackOff   9          2d
zoo-0     1/1       Running            1          2d
zoo-1     1/1       Running            1          2d
zoo-2     1/1       Running            2          2d
zoo-3     1/1       Running            1          2d
zoo-4     1/1       Running            2          2d

kubectl get nodes --namespace kafka
NAME                    STATUS                     AGE       VERSION
k8s-agent-fb4b805d-0    Ready                      4d        v1.6.6
k8s-agent-fb4b805d-1    Ready                      4d        v1.6.6
k8s-agent-fb4b805d-2    Ready                      4d        v1.6.6

Here the error:


[2017-07-24 08:36:03,958] INFO Socket connection established to zookeeper.kafka.svc.cluster.local/10.0.10.156:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-07-24 08:36:03,959] INFO Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2017-07-24 08:36:04,456] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-07-24 08:36:05,151] INFO Opening socket connection to server zookeeper.kafka.svc.cluster.local/10.0.10.156:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-07-24 08:36:05,152] INFO Socket connection established to zookeeper.kafka.svc.cluster.local/10.0.10.156:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-07-24 08:36:05,255] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)
[2017-07-24 08:36:05,256] INFO EventThread shut down for session: 0x0 (org.apache.zookeeper.ClientCnxn)
[2017-07-24 08:36:05,256] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)


	at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
	at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
	at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
	at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
[2017-07-24 08:36:05,258] INFO shutting down (kafka.server.KafkaServer)
[2017-07-24 08:36:05,261] INFO shut down completed (kafka.server.KafkaServer)
[2017-07-24 08:36:05,262] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
	at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
	at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
	at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
	at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
[2017-07-24 08:36:05,263] INFO shutting down (kafka.server.KafkaServer)

By looking it seems that ZK has changed IP address :-( from .156 to 157 and kafka is still looking to the previous IP address

zookeeper 10.0.10.157 2181/TCP 2d

What in case of deploy ZK on a cluster and Kafka on another???

What I can do?
Tks
Prisco

Wanted: topic management, declarative

We're a small team with a small set of kafka topics and clients, but already we need a self-documenting way to create and update topics.

Use cases:

  • Ensure that a topic exists in minikube, or docker-compose, during development.
  • Ensure that a topic exists in production during service provisionting in Kubernetes.
  • Edit retention.ms.
  • Knowing if the topic was created with key and/or value scemas in mind.

Using kafka-topics.sh or the java api ok for testing, but for instance with production topics we want to create them with 1 replica during dev and >=3 in a real ckluster.

An option is to use helm for topic creation manifests. I've searched for best practices, like tooling other organizations use, but with no luck. I imagine most teams will want to document their kafka topics, beyond a naming convention.

data is lost after petset update

Hi I'm playing with your solution and everything works fine,
except when you want to update something in the petset aside from replicas count,
when you want to update something ( ie: node-affinity annotation ) the petset must be deleted and recreated , this force cluster downtime which is not good and also isn't the biggest issue,

backing storage in my cluster is hostpath (I have no other options, cinder/gluster are too slow for my cluster), ( I've used your same pv and pvc setup )
but pv and pvc cannot be bound to specific node, in this current setup also petset distinct images cannot be bound to specific nodes
so during a petset update, all containers gets deleted, and when are recreated if containers gets placed on the same previous pod you get your data back ( with datadir-$n ) if the container $n goes on another pod ( compared to the previous run ) you will get 2 directory datadir-$n and datadir-$n(of previous run) in each pod and data for the whole cluster is reinitialized in this new pvc->nodes assignment

a solution could be something like node-binding pv|pvc ( I don't think it's yet possible on kube 1.4.x ) or a way to update the petset group without deleting all pods and recreate them , also I don't think that's yet possible ..

I am missing something?

Issues with provisioning PVC

Hello,

Trying to deploy kafka on Kubernetes running on aws. I am using EFS on the storage end. Here is what I have for defining the PV:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-api-pers-kafka-0
  labels:
    app: api-pers-kafka
    podindex: "0"
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 100Mi
  nfs:
    path: /efsfileshare/datadir-kafka-0
    server: EFS URL
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-api-pers-kafka-1
  labels:
    app: api-pers-kafka
    podindex: "1"
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 100Mi
  nfs:
    path: /efsfileshare/datadir-kafka-1
    server: EFS URL
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-api-pers-kafka-2
  labels:
    app: api-pers-kafka
    podindex: "2"
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 100Mi
  nfs:
    path: /efsfileshare/datadir-kafka-2
    server: EFS URL

here is what i have for pvc:

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-api-pers-kafka-0
  namespace: capi
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 200Gi
  selector:
    matchLabels:
      app: kafka
      podindex: "0"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-api-pers-kafka-1
  namespace: capi
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 200Gi
  selector:
    matchLabels:
      app: kafka
      podindex: "1"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: datadir-api-pers-kafka-2
  namespace: capi
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 200Gi
  selector:
    matchLabels:
      app: kafka
      podindex: "2"

The PV's look to deploy fine, but the PVC's are stuck in a pending status. When I deploy the statefulset manifest, I get the following error:

[SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected., SchedulerPredicates failed due to PersistentVolumeClaim is not bound: "datadir-api-pers-kafka-0", which is unexpected.]

Here is the manifest for the statefulset:

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: api-pers-kafka
  namespace: capi
spec:
  serviceName: "api-pers-broker"
  replicas: 3
  template:
    metadata:
      labels:
        app: api-pers-kafka
    spec:
      terminationGracePeriodSeconds: 10
      containers:
      - name: api-pers-broker
        image: solsson/kafka-persistent
        ports:
        - containerPort: 9092
        command:
        - sh
        - -c
        - "./bin/kafka-server-start.sh config/server.properties --override broker.id=$(hostname | awk -F'-' '{print $2}')"
        volumeMounts:
        - name: datadir
          mountPath: /opt/kafka/data
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 200Gi

Not sure what needs correcting. Thanks!

Kafka svc must be bypassed when topics are not replicated to all instances

We need to investigate how to use the kafka k8s service when topics have fewer replicas than there are kafka instances.

Background: we've noticed some oddities using the no-kafka client lib, connecting to kafka:9092, if a kafka instance is lost and the topic isn't replicated to all instances. The client sometimes seems to react as if the topic does not exist.

In 0.10.1 the official java client seems to deprecate other means of connection than bootstrap.servers. This implies awareness of the FQDN or IP of each broker. A kubernetes service, on the other hand, is basically a round-robin proxy.

Track progress for Kubernetes 1.8 / kubernetes-kafka v3.0.0

Manifests:

  • Update to 1.8 workload API versions
  • Storage Retain (#85)
  • Retain for minikube too

Test open PRs:

Test tests:

  • kafkacat based, delete brokers etc
  • java based, delete brokers etc

Structure:

  • Addons -> master, selected using kubectl apply [folder]
    • See "1.8-" branches like 1.8-logs-streaming
    • See label v3.1

__consumer_offsets Replication and min insync replica issue

I have been running a cluster for a day or so now and I was able to write fine, but for some reason the consumer groups would not work. Eventually I found out that the issue was the __consumer_offsets topic was saying there wasn't enough insync replicas. 1 was insync, but the minimum was 2. Looking at the kafka manager the replication is only set to one so this would never be achieved. Updating the min.insync.replicas to 1 fixed the issue and now everything is working properly.

Not sure where this is set during the creation of the cluster, but it may be a good idea to fix this for future users. Took a couple hours to figure out the issue and fix it.

Zookeeper Readiness Probe Constantly Failing / Why no TCP Check

Hello,

I could not get the zookeeper part running consistently. I have random (about 1-2 per minute) a pod go into not ready for about 2-3 minutes, then back for a minute, then off again. I could not find the exact reason in the Kubernetes logs, just that the readiness probe failed. When I attach to the pod and run the nc command manually I always get a successful output, even when Kubernetes marked the pod as not ready. I tried increasing the timeout, but that was not the issue. Any other ideas how to further debug?

I'm running the latest version of the repo on 1.7.6 on GKE.

I changed it to a TCP readiness check and it works flawlessly. Only downside I get warnings in the logs because the client isn't sending proper commands, I can live with that. Is there any reason why not use a TCP probe as the default?

Cheers,
Elmar

Use of pzoo and zoo as persistent/ephemeral storage nodes

Hi,

In Zookeeper we have the notion of persistent/ephemeral nodes, but I'm struggling to understand why these concepts have been used here in terms of persistent volumes in K8s.

Can someone elaborate a bit further on what the objectives are for this intentional configuration?

Thanks.

server properties not being changed

I edited the 10broker-config.yaml to set auto.create.topics.enable=true but the config file on the server seems to be unchanged. Any idea as to what I am missing.

CrashLoopBackOff caused by Exiting because log truncation is not allowed

After testing #95 on minikube I shut down the VM and then started it again to test #97 ...

Now kafka-2 goes crashlooping with:

[2017-11-10 13:17:57,563] INFO Truncating test-kafkacat-0 to 2334 has no effect as the largest offset in the log is 2333. (kafka.log.Log)
[2017-11-10 13:17:57,565] INFO Truncating test-produce-consume-0 to 2391 has no effect as the largest offset in the log is 2390. (kafka.log.Log)
[2017-11-10 13:17:57,571] INFO [Partition kafka-monitor-topic-0 broker=2] kafka-monitor-topic-0 starts at Leader Epoch 4 from offset 1. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2017-11-10 13:17:57,590] FATAL [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test-produce-consume-0, current leader's latest offset 2051 is less than replica's latest offset 2391 (kafka.server.ReplicaFetcherThread)
[2017-11-10 13:17:57,590] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2017-11-10 13:17:57,606] INFO [KafkaServer id=2] shutting down (kafka.server.KafkaServer)
[2017-11-10 13:17:57,607] INFO [KafkaServer id=2] Starting controlled shutdown (kafka.server.KafkaServer)
[2017-11-10 13:17:57,631] WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. This implies messages have arrived out of order. New: {epoch:15, offset:2334}, Current: {epoch:18, offset2304} for Partition: test-kafkacat-0 (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-10 13:17:57,632] WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. This implies messages have arrived out of order. New: {epoch:15, offset:2335}, Current: {epoch:18, offset2304} for Partition: test-kafkacat-0 (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-10 13:17:57,632] WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. This implies messages have arrived out of order. New: {epoch:15, offset:2336}, Current: {epoch:18, offset2304} for Partition: test-kafkacat-0 (kafka.server.epoch.LeaderEpochFileCache)

Further restarts fail immediately with FATAL [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test-produce-consume-0, current leader's latest offset 2117 is less than replica's latest offset 2391 (kafka.server.ReplicaFetcherThread).

Zookeeper readiness probes fail occasionally

This is a follow-up on #63. A couple of times per day the probe fails on some zookeeper pods.

These failing ruok calls are the reason we did 1f4321e.

As 2c4b6cd explains I found this sort of probe in a different zookeeper k8s setup. Maybe it's not reliable enough, or maybe zookeeper is actually unstable.

If we solve this we can reinstate the liveness probes.

External service for kafka not working

Hi,
I tried to use a NodePort type to expose the kafka service out.
Attached my service yml file.

And here's the service descrition:
$ kubectl describe svc kafka -n kafka
Name: kafka
Namespace: kafka
Labels:
Selector: app=kafka
Type: NodePort
IP: 100.73.225.207
Port: 9092/TCP
NodePort: 32093/TCP
Endpoints: 10.44.0.10:9092,10.44.0.11:9092,10.44.0.12:9092
Session Affinity: None

But when I tried using port 32039 external to connect to the kafka service it seems not working.

$ ./bin/kafka-console-producer.sh --broker-list ${master-ip}:32093 --topic test2
abc
[2016-11-18 15:26:58,157] ERROR Error when sending message to topic test2 with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test2-0 due to 1512 ms has passed since batch creation plus linger time

I'm pretty sure that the connection to the master-ip is working, and the port 32039 is listening in the cluster.

It works for the zookeeper, but I'm not sure why the kafka not working

No Route to Host on StatefulSet Update

Hi there,

I've noticed that when I update the kafka StatefulSet configuration and do a rolling deploy, some or all of the brokers get into a bad state in which they appear to be unable to reach the network. These are the errors I'm seeing:

[2017-10-13 19:24:36,928] INFO Opening socket connection to server zookeeper.kafka.svc.cluster.local/100.64.135.230:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-10-13 19:24:38,051] WARN Session 0x15f1678eb8a0001 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.NoRouteToHostException: No route to host
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

Anyone have an idea of what's going on here?

Thanks!

kube-events goes into Completed state without RBAC

Remaining issue from #92.

There's no meaningful error message. Pod start looks like:

%7|1513582363.494|BROKER|rdkafka#producer-1| [thrd:app]: bootstrap.kafka:9092/bootstrap: Added new broker with NodeId -1
%7|1513582363.494|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: ops.kube-events-all.stream.json.001
%7|1513582363.494|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW ops.kube-events-all.stream.json.001 [-1] 0x56108fa42840 (at rd_kafka_topic_new0:282)
%7|1513582363.498|BRKMAIN|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: Enter main broker thread
%7|1513582363.498|CONNECT|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: broker in state INIT connecting
%7|1513582363.499|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1513582363.499|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1513582363.514|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance
%7|1513582363.514|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1513582363.514|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1513582363.514|TOPPARREMOVE|rdkafka#producer-1| [thrd:main]: Removing toppar ops.kube-events-all.stream.json.001 [-1] 0x56108fa42840
%7|1513582363.514|DESTROY|rdkafka#producer-1| [thrd:main]: ops.kube-events-all.stream.json.001 [-1]: 0x56108fa42840 DESTROY_FINAL
%7|1513582363.514|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state UP: 1 refcnts, 0 toppar(s), 0 fetch toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1513582363.561|CONNECT|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: Connecting to ipv4#10.3.252.81:9092 (plaintext) with socket 7
%7|1513582363.561|STATE|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1513582363.561|TERMINATE|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: Handle is terminating: failed 0 request(s) in retry+outbuf
%7|1513582363.561|BROKERFAIL|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress)
%7|1513582363.561|STATE|rdkafka#producer-1| [thrd:bootstrap.kafka:9092/bootstrap]: bootstrap.kafka:9092/bootstrap: Broker changed state CONNECT -> DOWN
%7|1513582363.614|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating: failed 0 request(s) in retry+outbuf
%7|1513582363.614|BROKERFAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success)
%7|1513582363.614|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state UP -> DOWN

To add RBAC, apply recursively: kubectl apply -R -f events-kube/

I found no obvious solution to how to print curl's errors. Do they end up in the topic?

Link to example of Kafka Streams usage is not working.

In section Start Kafka of README:

That's it. Just add business value ๐Ÿ˜‰. For clients we tend to use librdkafka-based drivers like node-rdkafka. To use Kafka Connect and Kafka Streams you may want to take a look at our sample Dockerfiles.

The link for Dockerfiles is not working. Based on the content of the link (github.com/solsson/dockerfiles/tree/master/streams-logfilter), I suppose it was for the stream example? I have not found there is a folder named with stream in that repo.

May I know where I should look at for the stream example?

Thanks in advance,

Liwei

Try 5 zookeeper instances

From Kafka: The Definitive Guide (Narkhede, Shapira, Palino 2016):

Consider running Zookeeper in a 5-node ensemble. In order to make configuration changes to the ensemble, including swapping a node, you will need to reload nodes one at a time.

Using local storage on SAN

Hi,
I'm new to Kubernetes so hope this doesn't cause a problem for you, if so, please let me know.
I am using your template to setup Zookeeper/Kafka on our own Kubernetes environment and modified it in such a way that data and config write to local disk on the host (LUN presented to all 6 hosts).
When I 'apply' the Stateful Sets for pzoo, zoo and kafka, I see that directories are created correctly, but no config files (configMag) are generated, hence the containers fail to start with error: Back-off restarting failed container.

The error in the docker logs : /bin/bash: /etc/kafka/init.sh: No such file or directory

Any clue on what I might be doing wrong? This is what I modified:

apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-zookeeper-config
  labels:
    type: local
spec:
  storageClassName: zookeeper-config
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteMany
  hostPath:
    path: "/kubernetes/zk/config"

kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: zookeeper-config-claim
spec:
  storageClassName: zookeeper-config
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 3Gi

in 50pzoo.yml:

      volumes:
      - name: config
        configMap:
          name: zookeeper-config
        persistentVolumeClaim:
          claimName: zookeeper-config-claim
      - name: data
        persistentVolumeClaim:
          claimName: zookeeper-data-claim

Any help is highly appreciated.

Kind regards,

Eric V.

Alternate namespaces + StatefulSet names

I'm attempting to run this in an alternate namespace however if I execute the 50kafka StatefulSet on any namespace other than kafka I get the following error:

PersistentVolumeClaim is not bound: "data-kafka-0" (repeated 7 times)

Zookeeper however runs just fine. I can't spot anything causing this in the kafka StatefulSet - am I missing something?

JVM Settings

Since using a JVM in containers is already somewhat scary, setting limits on the pods is always a good idea, however there are no mentions of this in the docs.

Adding a:

        env:
        - name: "KAFKA_HEAP_OPTS"
          value: "-Xmx6G -Xms6G"
        resources:
          limits:
            cpu: 4
            memory: 8G

In the docs may be a good idea. JVMs tend to consume memory unless told otherwise.

Kafkacat doesn't work with bootstrap service

kubectl run -it --rm kafkacat --image=solsson/kafkacat --restart=Never -- -C -b kafka-bootstrap:9200 -t mytopic -p 0 -J -e
% KC_ERROR: Failed to query metadata for topic mytopic: Local: Broker transport failure

It works with the full list of service DNS names and the bootstrap service works just fine with Kafka libraries we're using in code (Java client).

pod ends up in Completed state when it fails to mount its volume

We had a pod end up as "Completed" in production after it was rescheduled to to spikes in our resource usage (caused by careless overloading from other deployments).

The effect of this is that kubernetes won't remove the pod and create a new one, as it would if it would have ended up as "Error", which I expect it would.

The below output from describe pod explains what we believe was the cause/reason. It failed to mount its volume (which was being moved between gcloud instances).

> kubectl -n kafka describe pod kafka-1
Name:		kafka-1
Namespace:	kafka
Node:		gke-eu-west-2-default-pool-29350486-ndvt/10.132.0.7
Start Time:	Mon, 26 Dec 2016 02:00:59 +0100
Labels:		app=kafka
Status:		Running
IP:
Controllers:	StatefulSet/kafka
Containers:
  broker:
    Container ID:	docker://649e62ca52cb4f2f0fd8b26dbb83777e6a3f99bc63247b27131d47a536103e6f
    Image:		solsson/kafka-persistent:0.10.1@sha256:110f9e866acd4fb9e059b45884c34a210b2f40d6e2f8afe98ded616f43b599f9
    Image ID:		docker-pullable://solsson/kafka-persistent@sha256:110f9e866acd4fb9e059b45884c34a210b2f40d6e2f8afe98ded616f43b599f9
    Port:		9092/TCP
    Command:
      sh
      -c
      ./bin/kafka-server-start.sh config/server.properties --override broker.id=$(hostname | awk -F'-' '{print $2}')
    State:		Terminated
      Reason:		Completed
      Exit Code:	0
      Started:		Thu, 29 Dec 2016 16:44:10 +0100
      Finished:		Thu, 05 Jan 2017 09:25:33 +0100
    Ready:		False
    Restart Count:	1
    Volume Mounts:
      /opt/kafka/data from datadir (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-x85zh (ro)
    Environment Variables:	<none>
Conditions:
  Type		Status
  Initialized 	True
  Ready 	False
  PodScheduled 	True
Volumes:
  datadir:
    Type:	PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:	datadir-kafka-1
    ReadOnly:	false
  default-token-x85zh:
    Type:	Secret (a volume populated by a Secret)
    SecretName:	default-token-x85zh
QoS Class:	BestEffort
Tolerations:	<none>
Events:
  FirstSeen	LastSeen	Count	From						SubObjectPath	Type		Reason		Message
  ---------	--------	-----	----						-------------	--------	------		-------
  4h		28s		119	{kubelet gke-eu-west-2-default-pool-29350486-ndvt}			Warning		FailedMount	Unable to mount volumes for pod "kafka-1_kafka(c37db470-cb06-11e6-882c-42010a84014e)": timeout expired waiting for volumes to attach/mount for pod "kafka-1"/"kafka". list of unattached/unmounted volumes=[datadir]
  4h		28s		119	{kubelet gke-eu-west-2-default-pool-29350486-ndvt}			Warning		FailedSync	Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod "kafka-1"/"kafka". list of unattached/unmounted volumes=[datadir]

Error and Completed state on fresh pods with negligible load

While working on #35 I suddenly had a depressing status report:

NAME                               READY     STATUS      RESTARTS   AGE
consumer-test1-1588418567-r245l    1/1       Running     0          2h
kafka-0                            2/2       Running     3          3h
kafka-1                            2/2       Running     0          3h
kafka-2                            0/2       Completed   0          3h
monitoring-test-3979390023-c5nwm   1/1       Running     0          7m
pzoo-0                             0/2       Error       0          21h
pzoo-1                             2/2       Running     0          21h
pzoo-2                             2/2       Running     2          21h
testclient                         1/1       Running     1          2h
zoo-0                              2/2       Running     0          20h
zoo-1                              2/2       Running     0          20h

Logs show nothing unusual, but kubectl describe says:

  FirstSeen	LastSeen	Count	From							SubObjectPath	Type		Reason		Message
  ---------	--------	-----	----							-------------	--------	------		-------
  2h		36s		64	kubelet, gke-eu-west-3-default-pool-c36cf3c6-nfgg			Warning		FailedMount	Unable to mount volumes for pod "pzoo-0_kafka(bd1a380f-5a60-11e7-93e7-42010a84002d)": timeout expired waiting for volumes to attach/mount for pod "kafka"/"pzoo-0". list of unattached/unmounted volumes=[data]
  2h		36s		64	kubelet, gke-eu-west-3-default-pool-c36cf3c6-nfgg			Warning		FailedSync	Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod "kafka"/"pzoo-0". list of unattached/unmounted volumes=[data]
  FirstSeen	LastSeen	Count	From							SubObjectPath	Type		Reason		Message
  ---------	--------	-----	----							-------------	--------	------		-------
  2h		24s		65	kubelet, gke-eu-west-3-default-pool-c36cf3c6-nfgg			Warning		FailedMount	Unable to mount volumes for pod "kafka-2_kafka(247ecd2f-5afa-11e7-93e7-42010a84002d)": timeout expired waiting for volumes to attach/mount for pod "kafka"/"kafka-2". list of unattached/unmounted volumes=[data]
  2h		24s		65	kubelet, gke-eu-west-3-default-pool-c36cf3c6-nfgg			Warning		FailedSync	Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod "kafka"/"kafka-2". list of unattached/unmounted volumes=[data]

All PVCs are bound.

Persistence setup was changed in #33.

PodDisruptionBudget and AntiAffinity

I'm just wondering if you've looked into using PodDisruptionBudgets and AntiAffinity rules at all for this example?

I was playing around with a new cluster on gke, deployed zookeeper and Kafka and then upgraded my cluster to a different version of kubernetes. I noticed, while the nodes were draining/coming back up, near the end all my zookeeper pods ended up being on the same node, which when preempted caused a brief zookeeper outage.

The standard zookeeper k8s example (https://kubernetes.io/docs/tutorials/stateful-application/zookeeper/) does contain both of them. Is there a particular reason why it's not used, or just not something that you've looked into yet?

externalIp of zk and kf

Hello, can you please describe how can I expose zk and kf to be available from outside of k8s cluster?

advertised hostname issues

Works fine until i connect a pub/sub. Seems like the broker is returning the wrong value , shouldn't this be the IP of the broker ?

socket.gaierror: getaddrinfo failed for kafka:9092, exception was [Errno -5] No address associated with hostname. Is your advertised.listeners (calledadvertised.host.name before Kafka 9) correct and resolvable?

Zookeeper persistent data?

Hi,

from the Readme:

Zookeeper runs as a Deployment without persistent storage:

but looking at zookeeper it looks like you are using stateful set with local storage. Maybe I'm missing something, but I could probably use same persistent storage approach as with kafka, eg:

<snip>
          volumeMounts:
            - name: zookeeper-data
              mountPath: /data
  volumeClaimTemplates:
    - metadata:
        name: zookeeper-data
        annotations:
          volume.beta.kubernetes.io/storage-class: slow
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 500m
<snip>

Haven't tested much, but seems to be working?

Producer and Consumer for external access of kafka service

Anyone can help me around with the producer and consumer commands for externally accessing the kafka-service ?

Tried using this command from outside the kubernetes cluster to create a topic :-
bin/kafka-topics.sh --create --zookeeper zookeeper.kafka.svc.cluster.local:2181 --replication-factor 3 --partitions 3 --topic test

BUT , no success. Got this error.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkException: Unable to connect to zookeeper.kafka.svc.cluster.local:2181 at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:72) at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1228) at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:157) at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:131) at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:79) at kafka.utils.ZkUtils$.apply(ZkUtils.scala:61) at kafka.admin.TopicCommand$.main(TopicCommand.scala:53) at kafka.admin.TopicCommand.main(TopicCommand.scala) Caused by: java.net.UnknownHostException: zookeeper.kafka.svc.cluster.local: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) at java.net.InetAddress.getAllByName0(InetAddress.java:1276) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at org.apache.zookeeper.client.StaticHostProvider.<init>(StaticHostProvider.java:61) at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:445) at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:380) at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:70)

Running on kubernetes cluster 1.6.2 fails

I'm running on already establish cluster (1.6.2) but it fails the kubectl apply -f ./zookeper and afterwards fails on kubectl apply -f ./.

Seems like the initContainer isn't running so I changed to for the zookeeper:

annotations:
        pod.beta.kubernetes.io/init-containers: '[
            {
                "name": "init-config",
                "image": "solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce",
                "command": ["/bin/bash", "/etc/kafka/init.sh"],
                "volumeMounts": [
                    {
                      "name": "config",
                      "mountPath": "/etc/kafka"
                    },
                    {
                      "name": "data",
                      "mountPath": "/var/lib/zookeeper/data"
                    }
                ]
            }
        ]'

But for 50kafka.yml because of spec.nodeName

initContainers:
      - name: init-config
        image: solsson/kafka-initutils@sha256:c275d681019a0d8f01295dbd4a5bae3cfa945c8d0f7f685ae1f00f2579f08c7d
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        command: ['/bin/bash', '/etc/kafka/init.sh']

I can't use the same approach.

There's something missing?

Cheers

GKE Kafkacat Error: Name or service not known

I am currently trying this setup on GCE. Kubernetes cluster node internal IP address is the same as GCE VM and everything is tested on the default namespace.

I created the topic internally and can produce and consume messages internally. But when I am trying to consume a message externally via kafkacat, I am receiving a name or service not known error: (advertised.listener issue perhaps?)

on@kubernetes:~/kubernetes-kafka$ kafkacat -C -b [kubernetes-cluster-IP]:32400 -t kube
%3|1510858507.273|FAIL|rdkafka#consumer-0| kafka-0.broker.default.svc.cluster.local:9094/0: Failed to resolve 'kafka-0.broker.default.svc.cluster.local:9094': Name or service not known
%3|1510858507.273|ERROR|rdkafka#consumer-0| kafka-0.broker.default.svc.cluster.local:9094/0: Failed to resolve 'kafka-0.broker.default.svc.cluster.local:9094': Name or service not known

kafkacat is able to find the broker and list the topic:

on@kubernetes:~/kubernetes-kafka$ kafkacat -C -b [kubernetes-cluster-IP]:32400 -L
Metadata for all topics (from broker -1: [kubernetes-cluster-IP]:32400/bootstrap):
 1 brokers:
  broker 0 at kafka-0.broker.default.svc.cluster.local:9094
 2 topics:
  topic "kube" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "__consumer_offsets" with 50 partitions:

Using grep, i get:

on@kubernetes:~/kubernetes-kafka$ kubectl -n default logs kafka-0 | grep "Registered broker"
[2017-11-16 16:50:41,754] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(kafka-0.broker.default.svc.cluster.local,9094,ListenerName(OUTSIDE),PLAINTEXT),EndPoint(kafka-0.broker.default.svc.cluster.local,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)

Below is my server.properties from SSH into kafka-0 pod (everything from log basics was excluded)
I notice that from the init.sh file that only the broker.id was set, broker.rack failed but don't need it, and advertised.listener was not changed at all. I manually set advertised.listener to [kubernetes-cluster-IP]:32400

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

#init#broker.rack=# zone lookup failed, see -c init-config logs

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=OUTSIDE://:9094,PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=OUTSIDE://[kubernetes-cluster-IP]:32400,PLAINTEXT://:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT
inter.broker.listener.name=PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

Using the outside-0 service:

kind: Service
apiVersion: v1
metadata:
  name: outside-0
  namespace: default
spec:
  selector:
    app: kafka
    kafka-broker-id: "0"
  ports:
  - protocol: TCP
    targetPort: 9094
    port: 32400
    nodePort: 32400
  type: NodePort

Related to issue #13 @solsson
Any help would be much appreciated, thanks.

License

Please would you add a public License agreement link on your repo, so that we can reference it for our internal security team?

Test failures for console producer/consumer with min.insync.replicas=2

Running the Kafka tests this one do not complete

NAME                    READY     STATUS    RESTARTS   AGE
kafkacat-4kvj4          3/3       Running   0          1h
produce-consume-kdd92   2/3       Running   0          1h

Logs from Testcase

Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
Nothing consumed yet
....

Is it probably the producer does not generate enough data? No errors in producer or consumer.

Using kubernetes 1.8.5 on GKE

Real-World Tests as Integration Tests

I've seen there are some tests already. I was wondering what the goal of them is at the moment? The reason is if it makes sense to contribute the following:

I'm testing now this for a production rollout for us. Key part are rolling updates / upgrades of kafka, node failures due to Kubernetes upgrades, etc. and that everything recovers without an issue. I have internally already a test setup that has services communicating via a Kafka queues and doing checksums at the end to make sure that everything worked. In between I'm playing chaos monkey to see if it's impacted by any issues I put to the Kafka nodes.

I was thinking of building some automated integration tests that test different scenarios like this with the Kubernetes cluster. It's actually dead simple to simulate this kind of things. Has anyone already considered this or some thoughts on it?

Readiness probe failed: zookeeper nc check error ,this is basic images has network problems, how to fix?

k8s version is 1.7 network is flanneld overlay
[root@master1 nginx]# kubectl get nodes -o wide
NAME STATUS AGE VERSION EXTERNAL-IP OS-IMAGE KERNEL-VERSION
db.k8s.novalocal Ready 14d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
elk-k8sdata.novalocal Ready 18d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
jenkins-master.k8s.novalocal Ready 12d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
k8s-monitor.novalocal Ready 18d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
lb1.k8s.novalocal Ready 14d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
master1.k8s.novalocal Ready 18d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
node1.k8s.novalocal Ready 19d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
node2.k8s.novalocal Ready 19d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
node3.k8s.novalocal Ready 19d v1.7.1 CentOS Linux 7 (Core) 4.4.77-1.el7.elrepo.x86_64
[root@master1 nginx]#
zookeeper nc check error ,this is images network bug!
readinessProbe:
exec:
command:
- /bin/sh
- -c
- '[ "imok" = "$(echo ruok | nc -w 1 127.0.0.1 2181)" ]'

[root@master1 ~]# kubectl get pod,svc -n kafka -o wide
NAME READY STATUS RESTARTS AGE IP NODE
po/kafka-0 1/1 Running 0 5d 172.16.47.7 elk-k8sdata.novalocal
po/kafka-1 1/1 Running 0 5d 172.16.1.3 node3.k8s.novalocal
po/kafka-2 1/1 Running 0 5d 172.16.38.6 node1.k8s.novalocal
po/pzoo-0 1/1 Running 0 16h 172.16.71.5 k8s-monitor.novalocal
po/pzoo-1 1/1 Running 0 16h 172.16.47.6 elk-k8sdata.novalocal
po/pzoo-2 1/1 Running 0 16h 172.16.5.6 db.k8s.novalocal
po/zoo-0 1/1 Running 0 16h 172.16.18.3 node2.k8s.novalocal
po/zoo-1 1/1 Running 0 16h 172.16.19.5 jenkins-master.k8s.novalocal

NAME CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
svc/broker None 9092/TCP 5d app=kafka
svc/pzoo None 2888/TCP,3888/TCP 5d app=zookeeper,storage=persistent
svc/zoo None 2888/TCP,3888/TCP 5d app=zookeeper,storage=ephemeral
svc/zookeeper 172.17.200.109 2181/TCP 5d app=zookeeper
[root@master1 ~]#
[root@master1 ~]# kubectl exec -it pzoo-0 /bin/bash -n kafka
root@pzoo-0:/opt/kafka# sh

while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 pzoo-2.pzoo.kafka.svc.cluster.local 2181 );echo $imok ;done

imok

imok

imok

^C

while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 pzoo-0.pzoo.kafka.svc.cluster.local 2181 );echo $imok ;done

imok
imok

imok
imok

imok

imok
imok

while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 127.0.0.1 2181 );echo $imok ;done

imok

imok
imok
imok
imok
imok
imok

imok

imok

Then nginx NC, but no problem, is not images network has problems, basic images has problems, how to fix?

root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 nginx-1.web 80 );echo $imok ;done

nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> ^C root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 nginx-0.web 80 );echo $imok ;done nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> nter>nginx/1.13.3/center>d> ^C root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 zookeeper.kafka.svc.cluster.local 2181 );echo $imok ;done imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok ^C root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 pzoo-0.pzoo.kafka.svc.cluster.local 2181 );echo $imok ;done imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok ^C root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 pzoo-1.pzoo.kafka.svc.cluster.local 2181 );echo $imok ;done imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok imok ^C root@nginx-0:/# while true ;do sleep 1 ;imok=$(echo ruok | nc -w 1 pzoo-2.pzoo.kafka.svc.cluster.local 2181 );echo $imok ;done imok imok imok imok imok imok imok imok imok imok imok imok imok imok

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.