admin管理员组文章数量:1122847
异步消息服务器,Spring
Kafka Producer默认是异步发送。
在初始化producer实例时,会创建一个sender线程负责批量发送消息;
producer将消息暂存在缓冲区,消息根据topic-partition分类缓存;
消息达到batch.size或者时间达到了linger.ms,sender线程将该批量的消息发送到topic-partition所在的broker
一、异步发送消息
KafkaTemplate默认是异步发送。可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息。
代码:kafkaTemplate.send(topic,message);
异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。
注:batch.size默认是16384,linger.ms默认是0。
下面是一段示例代码,异步发送消息,并打印返回结果:
public void publishAndResult(String topic, Object msg){ ProducerRecord pr = new ProducerRecord<>(topic, msg); pr.headers().add("tpye", msg.getClass().getName().getBytes(StandardCharsets.UTF_8)); ListenableFuture> future = kafkaTemplate.send(pr); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable ex) { ex.printStackTrace(); } @Override public void onSuccess(SendResult result) { System.out.println(result.getProducerRecord()); System.out.println(result.getRecordMetadata()); } });}
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息。
异步发送相比同步发送能处理更多的消息耗时更少,大多数情况下我们都不需要等待响应,所以大多数情况下我们都会使用异步发送。但是在遇到消息发送失败时我们不能准确地处理,所以我们需要一个异步发送消息失败的回调。
打印输出结果如下所示:
ProducerRecord(topic=CARGO_BOOKED_EVENT_TOPIC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = tpye, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=CargoBookedEvent(bookingId=40967167), timestamp=null) CARGO_BOOKED_EVENT_TOPIC-0@1
最后部分输出的格式为:消息主题(Topic)-分区(Partition)@偏移量(Offset)
二、同步发送消息
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
代码:kafkaTemplate.send(topic,message).get();
同步发送消息时,需要在每次send()方法调用get()方法,因为每次send()方法会返回一个Future类型的值,Future的get()方法会一直阻塞,直到该线程的任务获取到返回值,即当消息发送成功。
下面是一段示例代码,同步发送消息,并打印返回结果:
public void PublishSync(String topic, Object msg) { ProducerRecord pr = new ProducerRecord<>(topic, msg); pr.headers().add("tpye", msg.getClass().getName().getBytes(StandardCharsets.UTF_8)); try { SendResult sendResult = kafkaTemplate.send(pr).get(); System.out.println("消息发送成功:" + sendResult.toString()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }}
其中,get 方法还有一个重载方法 get(long timeout, TimeUnit unit),当 send 方法耗时大于 get 方法所设定的参数时会抛出一个超时异常。比如下面我们设置了超时时长为 1 微秒(肯定超时):
SendResult sendResult = kafkaTemplate.send("topic1", "消息回调测试").get(1, TimeUnit.MICROSECONDS);
在这里,send()方法先返回了一个Future对象,然后调用Future对象的get()方法等待kafka响应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到ProducerRecord对象和RecordMetadata对象,通过RecordMetadata对象可以获取消息的偏移量。同步发送需要等待kafka服务器的响应吞吐相对较低。
消息发送成功:SendResult [producerRecord=ProducerRecord(topic=CARGO_BOOKED_EVENT_TOPIC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = tpye, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=CargoBookedEvent(bookingId=E2110319), timestamp=null), recordMetadata=CARGO_BOOKED_EVENT_TOPIC-0@2]
版权声明:本文标题:异步消息服务器,Spring 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/biancheng/1705136492a639867.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论