admin管理员组

文章数量:1345398

just starting learning Kafka. Im trying to setup a small kafka cluster including 2 brokers. I was successfull in sending messages to my topic when both brokers are up. I want to test the behavior of my cluster when one of 2 brokers goes done. I stopped my primary broker (Kafka1) using docker stop kafka1, and i tried then to send a message to my cluster to see if my producer is able to understand that he need to send to kafka2 as kafka1 is down.

However i constantly receiving the below errors:

{"level":"ERROR","timestamp":"2022-07-19T18:59:46.891Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:39092","clientId":"my-app","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":144}

below is my producer code:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()

and below is my docker-pose-file:

version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
            - 29092:29092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka- 
    1:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka1/data:/var/lib/kafka/data
      kafka-2:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 39092:39092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka2/data:/var/lib/kafka/data

just starting learning Kafka. Im trying to setup a small kafka cluster including 2 brokers. I was successfull in sending messages to my topic when both brokers are up. I want to test the behavior of my cluster when one of 2 brokers goes done. I stopped my primary broker (Kafka1) using docker stop kafka1, and i tried then to send a message to my cluster to see if my producer is able to understand that he need to send to kafka2 as kafka1 is down.

However i constantly receiving the below errors:

{"level":"ERROR","timestamp":"2022-07-19T18:59:46.891Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:39092","clientId":"my-app","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":144}

below is my producer code:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()

and below is my docker-pose-file:

version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
            - 29092:29092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka- 
    1:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka1/data:/var/lib/kafka/data
      kafka-2:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 39092:39092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka2/data:/var/lib/kafka/data
Share Improve this question edited Jul 20, 2022 at 13:14 OneCricketeer 192k20 gold badges142 silver badges267 bronze badges asked Jul 19, 2022 at 19:23 roamsmartroamsmart 1451 gold badge2 silver badges9 bronze badges 3
  • Please never use an even number of zookeeper servers. You can use one Zookeeper with two brokers.. Also, why do you have localhost:49092 in the code? – OneCricketeer Commented Jul 19, 2022 at 19:43
  • Hi, just updated my docker-pose file by keeping only 1 zookeeper. I also removed localhost:49092 from the code, I was using 3 brokers in the beginning and forgot to remove it. i made the test again but still the same issue appear, there is no leader for this topic partition as we are in the middle of leadership election... – roamsmart Commented Jul 20, 2022 at 10:29
  • 1 Well, the logs aren't wrong... In the middle of election, there will be no leader. You're only sending one record here, so it's unclear what order of events you're running the code in – OneCricketeer Commented Jul 20, 2022 at 13:12
Add a ment  | 

1 Answer 1

Reset to default 5

If you've not created your topic some other way, Kafka will default to create your coverageEvolved topic used in the code with only one replica, and only one partition.

If you kill the broker hosting that one replica, there will be no in sync replica leader that can be produced to.

You can use Kafkajs to create topics.

Also worth mentioning, there's a transactions topic that only has one replica (you're missing an environment variable for it). This is mainly only relevant for Java clients since transactional producers are enabled by default as of Kafka 3.0

本文标签: