热门IT资讯网

如何用MQTT协议实现消息的订阅接收?

发表于:2024-11-24 作者:热门IT资讯网编辑
编辑最后更新 2024年11月24日,MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。操作步骤:引入相关的依赖 org.springf

MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:

  1. 引入相关的依赖
    org.springframework.boot    spring-boot-starter-integration    org.springframework.integration    spring-integration-mqtt    org.projectlombok    lombok    true
  1. 在application.yml配置MQTT服务器信息
server:  port: 8090mqtt:  host: tcp://127.0.0.1:1883  clientinid: mqttinId  clientoutid: mqttoutid  topic: virus  qoslevel: 1  #MQTT 认证  username: xxx  password: xxx  timeout: 10000  #20s  keepalive: 20
  1. 配置MQTT消息推送配置
package com.favccxx.mqtt.config;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;@Slf4j@Configuration@IntegrationComponentScanpublic class MQTTReceiveConfig {    @Value("${mqtt.username}")    private String username;    @Value("${mqtt.password}")    private String password;    @Value("${mqtt.host}")    private String hostUrl;    @Value("${mqtt.clientinid}")    private String clientId;    @Value("${mqtt.topic}")    private String defaultTopic;    @Value("${mqtt.timeout}")    private int completionTimeout ;   //连接超时    @Bean    public MqttConnectOptions getReceiverMqttConnectOptions(){        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();        mqttConnectOptions.setCleanSession(true);        mqttConnectOptions.setConnectionTimeout(10);        mqttConnectOptions.setKeepAliveInterval(90);        mqttConnectOptions.setAutomaticReconnect(true);        mqttConnectOptions.setUserName(username);        mqttConnectOptions.setPassword(password.toCharArray());        mqttConnectOptions.setServerURIs(new String[]{hostUrl});        mqttConnectOptions.setKeepAliveInterval(2);        return mqttConnectOptions;    }    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        factory.setConnectionOptions(getReceiverMqttConnectOptions());        return factory;    }    //接收通道    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    //配置client,监听的topic    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),                        defaultTopic);        adapter.setCompletionTimeout(completionTimeout);        adapter.setConverter(new DefaultPahoMessageConverter());        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    //通过通道获取数据    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message message) throws MessagingException {                log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());            }        };    }}
0