admin管理员组文章数量:1403361
I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,
@Service
public class MyPublishService {
@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient
@PostConstruct
void init(){
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}
public void publish(String topic, Object payload) {//which call sends payload to kafka
eventProducer.send(topic, payload);
}
}//end of service
@Configuration
class MyKafkaConfig {
String server; //which I expect to be localhost:9092 in integration tests
@Bean
public MyEventsProducer myEventsProducer(){
if(localEnv()) { //tells if setup is local kafka
server = "localhost:9092";
}
return new MyEventsProducer(server);
}
}
Now my IntegrationTest set up is,
@ExtendWith(SpringExtension.class)
@SpringBootTest(
classes = Application.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{
@Autowired
MyPublishService myPublishService;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void publishMessageTest() {
System.out.println(embeddedKafkaBroker.getBrokersAsString());
myPublishService.publishCollaboration(collaboration);
}
}
I have 2 questions,
- Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set
spring.kafka.bootstrap-servers: localhost:9092
inapplication-test.yaml
but it did not help. What should I do to get a fixed host:port? - I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.
Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 .testcontainers:kafka:1.19.3
I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,
@Service
public class MyPublishService {
@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient
@PostConstruct
void init(){
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}
public void publish(String topic, Object payload) {//which call sends payload to kafka
eventProducer.send(topic, payload);
}
}//end of service
@Configuration
class MyKafkaConfig {
String server; //which I expect to be localhost:9092 in integration tests
@Bean
public MyEventsProducer myEventsProducer(){
if(localEnv()) { //tells if setup is local kafka
server = "localhost:9092";
}
return new MyEventsProducer(server);
}
}
Now my IntegrationTest set up is,
@ExtendWith(SpringExtension.class)
@SpringBootTest(
classes = Application.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{
@Autowired
MyPublishService myPublishService;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void publishMessageTest() {
System.out.println(embeddedKafkaBroker.getBrokersAsString());
myPublishService.publishCollaboration(collaboration);
}
}
I have 2 questions,
- Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set
spring.kafka.bootstrap-servers: localhost:9092
inapplication-test.yaml
but it did not help. What should I do to get a fixed host:port? - I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.
Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 .testcontainers:kafka:1.19.3
Share Improve this question edited Mar 12 at 9:34 user2206366 asked Mar 12 at 9:29 user2206366user2206366 4813 gold badges7 silver badges17 bronze badges1 Answer
Reset to default 1See @EmbeddedKafka
JavaDocs:
/**
* Set explicit ports on which the kafka brokers will listen. Useful when running an
* embedded broker that you want to access from other processes.
* A port must be provided for each instance, which means the number of ports must match the value of the count attribute.
* This property is not valid when using KRaft mode.
* @return ports for brokers.
* @since 2.2.4
*/
int[] ports() default { 0 };
That brokerProperties.port
i not involved in the Broker instance creation.
本文标签: spring bootEmbeddedKafkaBroker using fixed portStack Overflow
版权声明:本文标题:spring boot - EmbeddedKafkaBroker using fixed port - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744761713a2623793.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论