SpringCloud整合Kafka

环境请自行搭建,本文仅适用于SpringCloud框架整合Kafka实现消息收发。

application.yml新增Kafka配置信息

spring:
  #==============================================================
  #spring-cloud-stream-Kafka配置 开始
  #==============================================================
  cloud:
    stream:
      default-binder: kafka #Default binder
      bindings:
        #缺省的输入、输出通道(配置自己定义的通道与哪个中间件交互)
        es_default_input:
          destination: es_default_topic
          binder: kafka
          group: es_default_group
          consumer:
            concurrency: 2 #入站消费者的并发性
        es_default_output:
          destination: es_default_topic
          binder: kafka
          content-type: text/plain
        #告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
        es_alarm_input:
          destination: es_alarm_topic
          binder: kafka
          group: es_alarm_group
        es_alarm_output:
          destination: es_alarm_topic
          binder: kafka
          content-type: text/plain
        #kafka配置
      kafka:
        binder:
          autoCreateTopics: true   # 自动创建topics
          autoAddPartitions: true 
          replicationFactor: 1
          brokers: cm02:9092,cm03:9092,cm04:9092  #Kafka的服务端列表,cm02为集群节点,在本地host配置,这里替换成你的IP即可
          zkNodes: cm01:2181,cm02:2181,cm03:2181  #Kafka服务端连接的ZooKeeper节点列表
          requiredAcks: 1
  #==============================================================
  #spring-cloud-stream-Kafka配置 结束
  #==============================================================

pom.xml引入依赖包。

在pom.xml的dependenies节点中进行新增,具体如下:

<!--kafka-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

自定义input和output通道

@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。

package com.github.market.admin.common.kafka;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * kafka自定义信息通道
 * @author sfy
 * @date 2020/9/22 10:00 上午
 */
public interface EsChannel {
    /**
     * 缺省发送消息通道名称
     */
    String ES_DEFAULT_OUTPUT = "es_default_output";

    /**
     * 缺省接收消息通道名称
     */
    String ES_DEFAULT_INPUT = "es_default_input";

    /**
     * 告警发送消息通道名称
     */
    String ES_ALARM_OUTPUT = "es_alarm_output";

    /**
     * 告警接收消息通道名称
     */
    String ES_ALARM_INPUT = "es_alarm_input";

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(ES_DEFAULT_OUTPUT)
    MessageChannel sendEsDefaultMessage();

    /**
     * 告警发送消息通道
     * @return channel 返回告警信息发送通道
     */
    @Output(ES_ALARM_OUTPUT)
    MessageChannel sendEsAlarmMessage();

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(ES_DEFAULT_INPUT)
    MessageChannel recieveEsDefaultMessage();

    /**
     * 告警接收消息通道
     * @return channel 返回告警信息接收通道
     */
    @Input(ES_ALARM_INPUT)
    MessageChannel recieveEsAlarmMessage();
}


定义消息发送器-生产者

package com.github.market.admin.common.kafka.sender;

import com.github.market.admin.common.kafka.EsChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * kafka消息发送器
 *      注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别
 *      为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。
 * @author sfy
 * @date 2020/9/22 10:45 上午
 */
@Slf4j
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


定义消息订阅器-消费者

package com.github.market.admin.listener.kafka;

import com.github.market.admin.common.kafka.EsChannel;
import com.xiaoleilu.hutool.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

/**
 * kafka从不同的通道实现消息的订阅。
 * @author sfy
 * @date 2020/9/22 10:52 上午
 */
@Slf4j
@EnableBinding(value = EsChannel.class)
public class EsKafkaMessageReceiveListener {

    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        log.info("{}订阅告警消息:通道 = es_default_input,data = {}", DateUtil.now(), message);
    }

    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
        log.info("{}订阅告警消息:通道 = es_alarm_input,data = {}", DateUtil.now(), message);
    }
}


新增EnableBinding

@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称。

@EnableAsync
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.github.market.admin", "com.github.collection.common.bean"})
@EnableBinding(EsChannel.class)  //加这个就好
public class MarketAdminServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MarketAdminServiceApplication.class, args);
    }
}

定义Controller调用

package com.github.market.admin.controller;

import com.github.collection.common.util.Response;
import com.github.market.admin.common.kafka.sender.EsKafkaMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * @author sfy
 * @date 2020/9/21 10:51 上午
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
    @Autowired
    private EsKafkaMessageSender sender

    /**
     * kafka发送消息
     * @author sfy
     * @date 2020/9/22 3:50 下午
     */
    @PostMapping("/testKafkaMessageSend")
    public void testKafkaMessageSend(String message) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }

    /**
     * kafka发送消息
     * @author sfy
     * @date 2020/9/22 3:50 下午
     */
    @PostMapping("/testKafkaMessageSend2")
    public void testKafkaMessageSend2(String message) {
        sender.sendToAlarmChannel(message);
    }

}

这里是分割线···

在这里插入图片描述
传送链

  • 4
    点赞
  • 32
    收藏
    觉得还不错? 一键收藏
  • 1
    评论
### 回答1: Spring Cloud可以很方便地与Kafka集成,实现消息的异步处理和分布式架构。具体步骤如下: 1. 引入Kafka依赖 在pom.xml文件中添加以下依赖: ``` <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.7.RELEASE</version> </dependency> ``` 2. 配置Kafka 在application.yml文件中添加以下配置: ``` spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ``` 3. 创建Kafka生产者 使用Spring Kafka提供的KafkaTemplate类创建生产者,发送消息到Kafka: ``` @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("my-topic", message); } ``` 4. 创建Kafka消费者 使用Spring Kafka提供的@KafkaListener注解创建消费者,监听Kafka消息: ``` @KafkaListener(topics = "my-topic", groupId = "my-group") public void receiveMessage(String message) { System.out.println("Received message: " + message); } ``` 以上就是Spring Cloud整合Kafka的基本步骤。通过这种方式,我们可以很方便地实现消息的异步处理和分布式架构。 ### 回答2: SpringCloud是一个全面的解决方案,包括多个子项目,专门解决微服务开发的挑战。Kafka是一个分布式的消息队列系统,被广泛应用于数据处理、日志收集和实时数据分析等场景。在分布式应用中,SpringCloudKafka整合可以提供灵活、可靠和高效的数据通信机制,帮助开发人员构建高性能的微服务应用。 下面,我们将介绍一些在SpringCloud整合Kafka的最佳实践,以帮助开发人员轻松应对各种数据通信需求。 1. 集成Kafka客户端 要在SpringCloud中使用Kafka,首先需要将Kafka客户端集成到应用程序中。可以使用Kafka提供的各种Java客户端,例如KafkaConsumer和KafkaProducer。在应用程序的pom.xml文件中添加以下依赖项: ``` <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> ``` 2. 配置Kafka生产者和消费者 在使用Kafka之前,必须对Kafka进行正确的配置。为此,需要在SpringCloud应用程序的配置文件中添加以下属性: ``` spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=group1 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer ``` 在上述配置中,我们指定了Kafka生产者和消费者的序列化器和反序列化器类型。必须在编写生产者和消费者代码之前对此进行配置。 3. 消息生产者 创建一个Kafka消息生产者,需要实现KafkaProducer接口,并可以使用以下代码创建Producer实例: ``` @Bean public Producer<String, String> kafkaProducer(){ Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(configs); } ``` 其中,bootstrapServers是Kafka集群的初始连接地址,可以在SpringCloud配置文件中进行配置。 4. 消息消费者 创建一个Kafka消息消费者,需要实现KafkaConsumer接口,并可以使用以下代码创建Consumer实例: ``` @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return new DefaultKafkaConsumerFactory<>(props); } ``` 在上述代码中,我们创建了一个包含多个线程的消息监听容器,以同时处理Kafka主题的多个分区。需要确保在应用程序中为分区设置正确数量的并发线程,以最大程度地利用SpringCloudKafka的分布式处理能力。 5. 使用注解实现消息监听 通过配置@KafkaListener注释,可以使方法成为一个Kafka消息监听器,并且自动处理所有传入的消息,如下所示: ``` @KafkaListener(topics = "test-topic") public void receive(String message) { logger.info("Received message: {}", message); } ``` 在上述代码中,我们使用@KafkaListener注释,指定要监听的主题名称,并在接收到新消息时调用“接收”方法。 总结 SpringCloudKafka整合可以为分布式应用提供高效、可靠的数据通信机制。开发人员可以使用Kafka提供的强大消息队列功能,将消息传递到应用程序中,从而实现高性能、高可用性的微服务架构。尽管整合KafkaSpringCloud需要一些技巧和经验,但一旦掌握了这些技能,就可以将它们应用于各种分布式应用场景。 ### 回答3: Spring Cloud是一种基于Spring Framework的微服务框架,它是一个开放源代码的软件框架,用于开发和管理云应用程序。而Kafka是一个开源发布-订阅计算系统,具有高吞吐量、低延迟和高扩展性等优点。下面,我们将谈论如何使用Spring Cloud整合Kafka。 首先,我们需要在Spring Boot应用程序中添加Spring Kafka依赖项。在pom.xml文件的依赖项中,我们需要添加以下Maven坐标: ```xml <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version> </dependency> ``` 在应用程序中使用Spring Kafka生产者,我们需要创建一个KafkaTemplate实例并使用kafkaTemplate.send()方法发送消息。下面是一个示例: ```java @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } ``` 在使用Spring Kafka消费者时,我们需要实现KafkaListener接口,并在方法上添加@KafkaListener注释。在注释中设置消费者监听的Topic名称。以下是一个示例: ```java @KafkaListener(topics = "sampleTopic") public void consumeMessage(String message) { System.out.println("Received message: " + message); } ``` 在Spring Cloud中,我们可以使用Spring Cloud StreamKafka与其他消息中间件集成。使用Spring Cloud Stream,我们可以将Kafka配置指定为应用程序参数,使用统一的API从Kafka读取和写入数据。以下是一个示例: ```yaml spring: cloud: stream: bindings: input: destination: sampleTopic output: destination: sampleTopic kafka: binder: brokers: localhost:9092 ``` 在这里,我们定义了输入(input)和输出(output)绑定,并将它们都指定为使用名为sampleTopic的Topic。然后,我们定义了Kafka服务器的位置,其中该应用程序访问Kafka服务器的IP地址和端口。 综上所述,使用Spring Cloud整合Kafka需要以下步骤:添加Spring Kafka依赖项、编写Kafka生产者和消费者代码、使用Spring Cloud StreamKafka集成到Spring Cloud应用程序中。Spring Cloud使我们能够轻松地使用Kafka打造高可伸缩、高性能的实时数据管道。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值