admin管理员组

文章数量:1122847

异步消息服务器,Spring

Kafka Producer默认是异步发送。

在初始化producer实例时,会创建一个sender线程负责批量发送消息;

producer将消息暂存在缓冲区,消息根据topic-partition分类缓存;

消息达到batch.size或者时间达到了linger.ms,sender线程将该批量的消息发送到topic-partition所在的broker

一、异步发送消息

KafkaTemplate默认是异步发送。可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息。

代码:kafkaTemplate.send(topic,message);

异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。

注:batch.size默认是16384,linger.ms默认是0。

下面是一段示例代码,异步发送消息,并打印返回结果:

public void publishAndResult(String topic, Object msg){ ProducerRecord pr = new ProducerRecord<>(topic, msg); pr.headers().add("tpye", msg.getClass().getName().getBytes(StandardCharsets.UTF_8)); ListenableFuture> future = kafkaTemplate.send(pr); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable ex) { ex.printStackTrace(); } @Override public void onSuccess(SendResult result) { System.out.println(result.getProducerRecord()); System.out.println(result.getRecordMetadata()); } });}

可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息。

异步发送相比同步发送能处理更多的消息耗时更少,大多数情况下我们都不需要等待响应,所以大多数情况下我们都会使用异步发送。但是在遇到消息发送失败时我们不能准确地处理,所以我们需要一个异步发送消息失败的回调。

打印输出结果如下所示:

ProducerRecord(topic=CARGO_BOOKED_EVENT_TOPIC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = tpye, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=CargoBookedEvent(bookingId=40967167), timestamp=null) CARGO_BOOKED_EVENT_TOPIC-0@1

最后部分输出的格式为:消息主题(Topic)-分区(Partition)@偏移量(Offset)

二、同步发送消息

如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

代码:kafkaTemplate.send(topic,message).get();

同步发送消息时,需要在每次send()方法调用get()方法,因为每次send()方法会返回一个Future类型的值,Future的get()方法会一直阻塞,直到该线程的任务获取到返回值,即当消息发送成功。

下面是一段示例代码,同步发送消息,并打印返回结果:

public void PublishSync(String topic, Object msg) { ProducerRecord pr = new ProducerRecord<>(topic, msg); pr.headers().add("tpye", msg.getClass().getName().getBytes(StandardCharsets.UTF_8)); try { SendResult sendResult = kafkaTemplate.send(pr).get(); System.out.println("消息发送成功:" + sendResult.toString()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }}

其中,get 方法还有一个重载方法 get(long timeout, TimeUnit unit),当 send 方法耗时大于 get 方法所设定的参数时会抛出一个超时异常。比如下面我们设置了超时时长为 1 微秒(肯定超时):

SendResult sendResult = kafkaTemplate.send("topic1", "消息回调测试").get(1, TimeUnit.MICROSECONDS);

在这里,send()方法先返回了一个Future对象,然后调用Future对象的get()方法等待kafka响应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到ProducerRecord对象和RecordMetadata对象,通过RecordMetadata对象可以获取消息的偏移量。同步发送需要等待kafka服务器的响应吞吐相对较低。

消息发送成功:SendResult [producerRecord=ProducerRecord(topic=CARGO_BOOKED_EVENT_TOPIC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = tpye, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116]), RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 114, 105, 99, 107, 105, 101, 46, 100, 116, 111, 46, 101, 118, 101, 110, 116, 46, 67, 97, 114, 103, 111, 66, 111, 111, 107, 101, 100, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=CargoBookedEvent(bookingId=E2110319), timestamp=null), recordMetadata=CARGO_BOOKED_EVENT_TOPIC-0@2]

本文标签: 异步消息服务器Spring