环境请自行搭建,本文仅适用于SpringCloud框架整合Kafka实现消息收发。
application.yml新增Kafka配置信息
spring:
#==============================================================
#spring-cloud-stream-Kafka配置 开始
#==============================================================
cloud:
stream:
default-binder: kafka #Default binder
bindings:
#缺省的输入、输出通道(配置自己定义的通道与哪个中间件交互)
es_default_input:
destination: es_default_topic
binder: kafka
group: es_default_group
consumer:
concurrency: 2 #入站消费者的并发性
es_default_output:
destination: es_default_topic
binder: kafka
content-type: text/plain
#告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
es_alarm_input:
destination: es_alarm_topic
binder: kafka
group: es_alarm_group
es_alarm_output:
destination: es_alarm_topic
binder: kafka
content-type: text/plain
#kafka配置
kafka:
binder:
autoCreateTopics: true # 自动创建topics
autoAddPartitions: true
replicationFactor: 1
brokers: cm02:9092,cm03:9092,cm04:9092 #Kafka的服务端列表,cm02为集群节点,在本地host配置,这里替换成你的IP即可
zkNodes: cm01:2181,cm02:2181,cm03:2181 #Kafka服务端连接的ZooKeeper节点列表
requiredAcks: 1
#==============================================================
#spring-cloud-stream-Kafka配置 结束
#==============================================================
pom.xml引入依赖包。
在pom.xml的dependenies节点中进行新增,具体如下:
<!--kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
自定义input和output通道
@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
package com.github.market.admin.common.kafka;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* kafka自定义信息通道
* @author sfy
* @date 2020/9/22 10:00 上午
*/
public interface EsChannel {
/**
* 缺省发送消息通道名称
*/
String ES_DEFAULT_OUTPUT = "es_default_output";
/**
* 缺省接收消息通道名称
*/
String ES_DEFAULT_INPUT = "es_default_input";
/**
* 告警发送消息通道名称
*/
String ES_ALARM_OUTPUT = "es_alarm_output";
/**
* 告警接收消息通道名称
*/
String ES_ALARM_INPUT = "es_alarm_input";
/**
* 缺省发送消息通道
* @return channel 返回缺省信息发送通道
*/
@Output(ES_DEFAULT_OUTPUT)
MessageChannel sendEsDefaultMessage();
/**
* 告警发送消息通道
* @return channel 返回告警信息发送通道
*/
@Output(ES_ALARM_OUTPUT)
MessageChannel sendEsAlarmMessage();
/**
* 缺省接收消息通道
* @return channel 返回缺省信息接收通道
*/
@Input(ES_DEFAULT_INPUT)
MessageChannel recieveEsDefaultMessage();
/**
* 告警接收消息通道
* @return channel 返回告警信息接收通道
*/
@Input(ES_ALARM_INPUT)
MessageChannel recieveEsAlarmMessage();
}
定义消息发送器-生产者
package com.github.market.admin.common.kafka.sender;
import com.github.market.admin.common.kafka.EsChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* kafka消息发送器
* 注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别
* 为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。
* @author sfy
* @date 2020/9/22 10:45 上午
*/
@Slf4j
@Component
public class EsKafkaMessageSender {
@Autowired
private EsChannel channel;
/**
* 消息发送到默认通道:缺省通道对应缺省主题
* @param message
*/
public void sendToDefaultChannel(String message){
channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
}
/**
* 消息发送到告警通道:告警通道对应告警主题
* @param message
*/
public void sendToAlarmChannel(String message){
channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
}
}
定义消息订阅器-消费者
package com.github.market.admin.listener.kafka;
import com.github.market.admin.common.kafka.EsChannel;
import com.xiaoleilu.hutool.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
/**
* kafka从不同的通道实现消息的订阅。
* @author sfy
* @date 2020/9/22 10:52 上午
*/
@Slf4j
@EnableBinding(value = EsChannel.class)
public class EsKafkaMessageReceiveListener {
/**
* 从缺省通道接收消息
* @param message
*/
@StreamListener(EsChannel.ES_DEFAULT_INPUT)
public void receive(Message<String> message){
log.info("{}订阅告警消息:通道 = es_default_input,data = {}", DateUtil.now(), message);
}
/**
* 从告警通道接收消息
* @param message
*/
@StreamListener(EsChannel.ES_ALARM_INPUT)
public void receiveAlarm(Message<String> message){
System.out.println("订阅告警消息:" + message);
log.info("{}订阅告警消息:通道 = es_alarm_input,data = {}", DateUtil.now(), message);
}
}
新增EnableBinding
@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称。
@EnableAsync
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.github.market.admin", "com.github.collection.common.bean"})
@EnableBinding(EsChannel.class) //加这个就好
public class MarketAdminServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MarketAdminServiceApplication.class, args);
}
}
定义Controller调用
package com.github.market.admin.controller;
import com.github.collection.common.util.Response;
import com.github.market.admin.common.kafka.sender.EsKafkaMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
* @author sfy
* @date 2020/9/21 10:51 上午
*/
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
@Autowired
private EsKafkaMessageSender sender
/**
* kafka发送消息
* @author sfy
* @date 2020/9/22 3:50 下午
*/
@PostMapping("/testKafkaMessageSend")
public void testKafkaMessageSend(String message) {
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}
/**
* kafka发送消息
* @author sfy
* @date 2020/9/22 3:50 下午
*/
@PostMapping("/testKafkaMessageSend2")
public void testKafkaMessageSend2(String message) {
sender.sendToAlarmChannel(message);
}
}
这里是分割线···