spring boot2 kafka
一.软件版本
1.linux:centos6
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。2.zookeeper:zookeeper-3.4.1
3.kafka:kafka_2.12-2.2.0
4.jdk:1.8
5.instelliJ Idea
二.环境准备
1.关闭防火墙:service iptables stop
2.zookeeper:复制conf目录下的zoo_sample.cfg配置文件并改名为zoo.cfg
3.kafka:修改配置文件conf/server.properties中
①broker.id=123(broker.id每个kafka的都不一样,必须唯一)
②advertised.listeners=PLAINTEXT://192.168.56.101:9092(对应自己的linux的ip地址)
③zookeeper.connect=127.0.0.1:2181(本机默认端口是2181,有需要到zookeeper的zoo.cfg文件中修改)
三.软件setup&start
1.zookeeper:zookeeper-3.4.14/bin目录下执行
①./zkServer.sh start ../conf/zoo.cfg
2.kafka:kafka_2.12-2.2.0/bin目录下执行
①./kafka-server-start.sh ../config/server.properties
四.创建topic&发送消息&消费消息
1.创建topic “test”,kafka_2.12-2.2.0/bin目录下执行
①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
②验证是否创建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092
③开启消息生产者的console,kafka_2.12-2.2.0/bin目录下执行,并发送几条信息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
④开启消息消费者的console,kafka_2.12-2.2.0/bin目录下执行
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
以上步骤没出现问题表示kafka服务器端以及搭建完毕。
五.Spring boot2 kafka通信
1.创建topic:TopicConfiguration
package com.aaron.kafka.springboot.configuration; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; @Configuration public class TopicConfiguration { @Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"})); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("test", 1, (short) 2); } }
2.spring boot2中建立kafka的消息生产者:KafkaProducerConfiguration
package com.aaron.kafka.springboot.configuration; import com.aaron.kafka.springboot.listener.MessageListenerImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration public class kafkaConsumerConfiguration { @Bean public MessageListenerImpl messageListener(){ return new MessageListenerImpl(); } @Bean public KafkaMessageListenerContainer kafkaMessageListenerContainer(){ ContainerProperties containerProps = new ContainerProperties("test"); containerProps.setMessageListener(messageListener()); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } /** * 设置consumer的properties * @return */ private Map<String,Object> consumerProps(){ Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put("bootstrap.servers","192.168.56.101:9092"); consumerProps.put("group.id","123"); consumerProps.put("enable.auto.commit","true"); consumerProps.put("auto.commit.interval.ms","1000"); consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return consumerProps; } }
3.建立测试代码:KafkaTest
package com.aaron.kafka.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; @RunWith(SpringRunner.class) @SpringBootTest public class KafkaTest { @Resource private KafkaTemplate kafkaTemplate; @Test public void sendMsg(){ kafkaTemplate.send("test","hello","world"); } }
运行测试结果如下:
4.Spring boot2中建立消费者和消息监听器:kafkaConsumerConfiguration和MessageListenerImpl
package com.aaron.kafka.springboot.configuration; import com.aaron.kafka.springboot.listener.MessageListenerImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration public class kafkaConsumerConfiguration { @Bean public MessageListenerImpl messageListener(){ return new MessageListenerImpl(); } @Bean public KafkaMessageListenerContainer kafkaMessageListenerContainer(){ ContainerProperties containerProps = new ContainerProperties("test"); containerProps.setMessageListener(messageListener()); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } /** * 设置consumer的properties * @return */ private Map<String,Object> consumerProps(){ Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put("bootstrap.servers","192.168.56.101:9092"); consumerProps.put("group.id","123"); consumerProps.put("enable.auto.commit","true"); consumerProps.put("auto.commit.interval.ms","1000"); consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return consumerProps; } }
package com.aaron.kafka.springboot.listener; import org.springframework.kafka.listener.MessageListener; public class MessageListenerImpl implements MessageListener { @Override public void onMessage(Object o) { System.out.println(o.toString()); } }
关闭服务器上的消费者然后启动springBoot
在服务器上的producer中输入文本 Hello World
Idea 中的console显示如下:
