admin管理员组文章数量:1388140
I want to consume a stream from Kafka using Bytewax to perform aggregations. Unfortunately I'm not able to connect to Kafka and the connection is always refused. I assume something with the port setup is not correct, but could not figure it out. Its even more confusing to me that another KafkaConsumer (consumer.py) running in another container which consumes and prints the stream without any errors.
docker-compose.yml
services:
kafka:
image: apache/kafka
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
networks:
- app-network
consumer:
build:
context: ./kafka_consumer
dockerfile: Dockerfile
container_name: consumer
depends_on:
factory-service:
condition: service_started
kafka:
condition: service_healthy
ports:
- "8099:80"
networks:
- app-network
bytewax:
build:
context: ./consumer
dockerfile: Dockerfile
container_name: bytewax
depends_on:
- kafka
networks:
- app-network
networks:
app-network:
driver: bridge
consumer.py (prints datastream)
from kafka import KafkaConsumer
KAFKA_BROKER = "kafka:9093"
KAFKA_TOPIC = ["factory_001","factory_002"]
consumer = KafkaConsumer(
*KAFKA_TOPIC,
group_id='my-group',
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)
Stream_process.py (not working)
from bytewax import operators as op
from bytewax.connectors.kafka import KafkaSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
KAFKA_BROKER = ["kafka:9093"]
KAFKA_TOPIC = ["factory_001"]
flow = Dataflow("Average Aggregation")
stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC))
op.output("out", stream, StdOutSink())
Log message:
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts
raise RuntimeError(msg)
RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in"
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
cli_main(**kwargs)
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow
I want to consume a stream from Kafka using Bytewax to perform aggregations. Unfortunately I'm not able to connect to Kafka and the connection is always refused. I assume something with the port setup is not correct, but could not figure it out. Its even more confusing to me that another KafkaConsumer (consumer.py) running in another container which consumes and prints the stream without any errors.
docker-compose.yml
services:
kafka:
image: apache/kafka
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD", "bash", "-c", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
networks:
- app-network
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
networks:
- app-network
consumer:
build:
context: ./kafka_consumer
dockerfile: Dockerfile
container_name: consumer
depends_on:
factory-service:
condition: service_started
kafka:
condition: service_healthy
ports:
- "8099:80"
networks:
- app-network
bytewax:
build:
context: ./consumer
dockerfile: Dockerfile
container_name: bytewax
depends_on:
- kafka
networks:
- app-network
networks:
app-network:
driver: bridge
consumer.py (prints datastream)
from kafka import KafkaConsumer
KAFKA_BROKER = "kafka:9093"
KAFKA_TOPIC = ["factory_001","factory_002"]
consumer = KafkaConsumer(
*KAFKA_TOPIC,
group_id='my-group',
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)
Stream_process.py (not working)
from bytewax import operators as op
from bytewax.connectors.kafka import KafkaSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
KAFKA_BROKER = ["kafka:9093"]
KAFKA_TOPIC = ["factory_001"]
flow = Dataflow("Average Aggregation")
stream = op.input("kafka-in", flow, KafkaSource(KAFKA_BROKER, KAFKA_TOPIC))
op.output("out", stream, StdOutSink())
Log message:
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
%3|1742203770.748|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 1ms in state CONNECT)
%3|1742203771.749|FAIL|rdkafka#producer-1| [thrd:kafka:9093/bootstrap]: kafka:9093/bootstrap: Connect to ipv4#172.19.0.2:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ONNECT, 1 identical error(s) suppressed)
thread '<unnamed>' panicked at src/run.rs:128:17:
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Box<dyn Any>
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 387, in list_parts
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
return list(_list_parts(client, self._topics))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bytewax/connectors/kafka/__init__.py", line 172, in _list_parts
raise RuntimeError(msg)
RuntimeError: error listing partitions for Kafka topic `'factory_001'`: Broker: Unknown topic or partition
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/inputs.rs:252:47): error calling `FixedPartitionSource.list_parts` in step "Average Aggregation.kafka-in"
The above exception was the direct cause of the following exception:
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:354:34): error building FixedPartitionedSource
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/usr/local/lib/python3.11/site-packages/bytewax/run.py", line 355, in <module>
cli_main(**kwargs)
bytewax.errors.BytewaxRuntimeError: (src/worker.rs:149:10): error building production dataflow
Share
Improve this question
asked Mar 17 at 9:51
LeXXanLeXXan
276 bronze badges
1 Answer
Reset to default 1In the docker-compose.yml you are fetting to port map the 9093 port:
ports:
- "9092:9092"
- "9093:9093" # add this
This should port map your localhost to the advertised listener you defined for docker as kafka:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
You should then be able to connect to kafka:9093 on localhost, which will redirect to your listener.
Also, just in case as I believe you already did this (it would give a DNS error instead), but anyway, the kafka domain should be included in /etc/hosts so it can be resolved.
本文标签: Docker connection between Kafka and Bytewax refusedStack Overflow
版权声明:本文标题:Docker connection between Kafka and Bytewax refused - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744568860a2613218.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论