如何用MQTT协议实现消息的订阅接收?
发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。操作步骤:引入相关的依赖 org.springf
MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:
- 引入相关的依赖
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt org.projectlombok lombok true
- 在application.yml配置MQTT服务器信息
server: port: 8090mqtt: host: tcp://127.0.0.1:1883 clientinid: mqttinId clientoutid: mqttoutid topic: virus qoslevel: 1 #MQTT 认证 username: xxx password: xxx timeout: 10000 #20s keepalive: 20
- 配置MQTT消息推送配置
package com.favccxx.mqtt.config;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;@Slf4j@Configuration@IntegrationComponentScanpublic class MQTTReceiveConfig { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}") private String hostUrl; @Value("${mqtt.clientinid}") private String clientId; @Value("${mqtt.topic}") private String defaultTopic; @Value("${mqtt.timeout}") private int completionTimeout ; //连接超时 @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(), defaultTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; }}