跳到主要内容

使用 Pulsar

添加 pulsar starter

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
</dependency>

配置 pulsar

spring:
pulsar:
client:
operation-timeout: 30m
service-url: pulsar://127.0.0.1:6650
# pulsar 如果没有设置密码则不需要这个配置
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: GeneratedTokenValue
consumer:
name: consumer-server

开启 Pulsar

启动类添加注解:@EnablePulsar

简单封装下 PulsarTemplate

MessageTemplate.java
@Slf4j
public class MessageTemplate<T> {

private final PulsarTemplate<T> pulsarTemplate;

public MessageTemplate(PulsarTemplate<T> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}

@SneakyThrows
public void sendMsg(String topic, T message) {
sendMsg(topic, message, Objects.toString(message));
}

@SneakyThrows
public void sendMsg(String topic, T message, String key) {
MessageId messageId =
pulsarTemplate
.newMessage(message)
.withTopic(topic)
.withMessageCustomizer(mb -> mb.key(key))
.send();
log.info("消息发送成功,Topic={},message={},messageId={}", topic, message, messageId);
}
}
PulsarConfig.java
@Slf4j
@Configuration
public class PulsarConfig {

@Bean
public MessageTemplate<?> messageTemplate(PulsarTemplate<?> pulsarTemplate) {
return new MessageTemplate<>(pulsarTemplate);
}
}

生产者发送消息

Producer.java
@Service
@RequiredArgsConstructor
public class Producer {

private final MessageTemplate<Long> messageTemplate;

public void produce(Long id) {
messageTemplate.sendMsg(Topic.MESSAGE_TOPIC, id);
}
}

消费者消费消息

@Slf4j
@Service
public class TopicConsumer {

@SneakyThrows
@PulsarListener(
ackMode = AckMode.MANUAL,
subscriptionType = SubscriptionType.Shared,
subscriptionName = Subscription.PULSAR_CONSUMER,
topics = Topic.PULSAR_TOPIC)
public void consume(Message<Long> message, Consumer<Message<Long>> consumer) {
log.info("消费消息: {}", message.getValue());
consumer.acknowledge(message);
}
}