SpringCloud-Stream-Kafka

SpringCloud Stream Kafka

基础知识


消息服务器作用:

一对多通知

并行转串行,削峰,削流


JMS - Java Message Service

Java定义的消息模型或者规范,有一套标准的Java API访问消息服务器,类似于jdbc访问数据库

消息服务器产品:

ActiveMQ/RabbitMQ/RocketMQ/Kafka

ActiveMQ/RabbitMQ/RocketMQ实现了JMS

Kafka没有实现JMS


SpringCloud Stream:SpringCloud的消息服务接口,可以使用它访问:

RabbitMQ/Kafka


Kafka需要Zookeeper作为数据存储。


安装Kafka

官网下载:kafka.apache.org/downlo

安装和使用:kafka.apache.org/quicks

课程使用:kafka_2.12-2.2.0.zip

链接:pan.baidu.com/s/1Yy0Big

提取码:q4o3


解压



启动Kafka

点击start.bat

会启动2个cmd窗口:



使用,请参考zip包中的windows安装指南.txt


创建主题

启动一个cmd窗口,切换目录到kafka解压目录:



执行命令:

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myTopic



执行如下命令查询新建的主题:

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092



发送消息

输入命令:bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic

输入消息内容:abc




接收消息

另外启动一个新的cmd窗口,切换目录到kafka解压目录:



执行命令:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myTopic --from-beginning



启动之后,可以看到上面生产者发送的abc消息。

生产者继续发送abcd消息,消费者可以立即收到。


SpringCloud-Stream消息模型

Source:发射器

Channel:频道

Binder:绑定器

Sink:接收器




消费者分组:

假设下图中服务A的3个实例为一个分组,服务B的2个实例为一个分组,则消息过来之后,只会有一个服务A的实例和服务B的实例处理该消息,其他实例不会收到消息。



使用默认频道进行消息发送和接收

创建项目

父项目springcloud-stream

\springcloud-stream\pom.xml



创建子项目

\springcloud-stream\springcloud-stream-producer

\springcloud-stream\springcloud-stream-consumer

配置:

/springcloud-stream-producer/pom.xml



/springcloud-stream-consumer/pom.xml



发送消息

创建消息发送者类

/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java



配置频道及kafka

/springcloud-stream-producer/src/main/resources/application.yml



接收消息

创建消息接收者类

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java



配置频道及kafka

测试

参照第2节,启动kafka服务器

启动生产者

启动消费者

访问如下路径,发送消息qfedu2:



消息接收:



自定义频道发送和接收消息

消息生产者

自定义消息输出频道

/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/QfOutputChannel.java



配置spring

映射输出频道到kafka主题

/springcloud-stream-producer/src/main/resources/application.yml



启动类

绑定发射器,并增加发送消息代码

/springcloud-stream-producer/src/main/java/com/qfedu/demo/springcloud/stream/StreamProducer.java



消息消费者

自定义消息输入频道

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/QfInputChannel.java



配置spring

/springcloud-stream-consumer/src/main/resources/application.yml



启动类

/springcloud-stream-consumer/src/main/java/com/qfedu/demo/springcloud/stream/StreamConsumer.java



测试




发布于 2019-05-22 17:09