Code Monkey home page Code Monkey logo

mqtt-demo's Introduction

mqtt-demo

function

mqtt-demo基于Spring Integration和SpringBoot框架实现了java平台上的消息订阅和转发

Introduce

Spring Integration是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。 简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。 支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

MQTT通信方式默认是发布/订阅模式的。 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息Broker。 在MQTT中默认是广播的,也就是说订阅了相同topic的订阅者都能收到发布者发送的消息。

Integration

本项目使用的MQTT协议功能基于Spring Integration,使用前先在pom文件中引入maven依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>

Running

  1. 在配置文件application.yml中修改配置:
mqtt:
  protocol: tcp
  host: mqtt broker ip # 192.168.1.1
  port: 1883
  username: JavaServer
  password: 123456
  client-id: 123456
  # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
  inbound-topic: sys/+/available

主题

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

chat/room/1
sensor/10/temperature
sensor/+/temperature

主题 (Topic) 通过'/'分割层级,支持'+', '#'通配符:

'+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
'#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS

  1. 搭建或使用一个现成的mqtt服务器 Docker - 在容器中搭建运行EMQ服务器(MQTT服务器)

Configuration

  1. Inbound(消息驱动)通道适配器: 入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现,通常会在里面配置:
  • 客户端ID
  • MQTT Broker URL
  • 待订阅的主题列表
  • 待订阅的主题QoS值列表
public MessageProducerSupport mqttInbound() {
     MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
             mqttConfiguration.getClientId() + "_consumer_" + Instant.now().toEpochMilli(),
             mqttClientFactory, mqttConfiguration.getInboundTopic().split(","));
     DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
     // use byte types uniformly
     converter.setPayloadAsBytes(true);
     adapter.setConverter(converter);
     adapter.setQos(1);
     adapter.setOutputChannel(inboundChannel);
     return adapter;
 }

其中mqttClientFactory/mqttConfiguration配置了MQTT Broker URL、账户密码、主题列表等信息

Essential Component

  1. Message

Message包含Header和Payload两部分

  1. Message Channel

消息通过消息网关发送出去,由MessageChannel的实例来实现DirectChannel处理发送的细节。MqttMessageChannel.java中实现了定义:

@Configuration
public class MqttMessageChannel {
    @Autowired
    private Executor threadPool;

    @Bean(name = ChannelName.INBOUND)
    public MessageChannel inboundChannel() {
        return new ExecutorChannel(threadPool);
    }

    @Bean(name = ChannelName.OUTBOUND)
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }
}

MessageChannel用于解耦生产者和消费者,实现消息发送。

  1. Message Router
@Router(inputChannel = ChannelName.INBOUND_STATUS_ROUTER)
public String resolveStatusRouter(CommonTopicReceiver<SampleData> receiver) {
     // 根据状态内容判断是上线还是离线.
     if(receiver.getData().getStatus().equals(1))
         return ChannelName.INBOUND_STATUS_ONLINE;
     else if(receiver.getData().getStatus().equals(0))
         return ChannelName.INBOUND_STATUS_OFFLINE;
     else
         return ChannelName.DEFAULT;
 }

MessageRouter用来控制消息转发的Channel

  1. Service Activator
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
public MessageHandler defaultInboundHandler() {
     return message -> {
         log.info("默认通道不处理消息." +
                 "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                 "\nPayload: " + message.getPayload());
     };
}

Service Activator用来绑定MessageHandler和用于消费消息的MessageChannel
5. Channel Adapter

ChannelAdapter用来连接MessageChannel和具体的消息端口,例如通信的topic

Interactive logic

  1. 对于发布者: 消息通过消息网关发送出去,由MessageChannel的实例DirectChannel处理发送的细节。 DirectChannel收到消息后,内部通过MessageHandler的实例MqttPahoMessageHandler发送到指定的Topic

  2. 对于订阅者: 通过注入MessageProducerSupport的实例MqttPahoMessageDrivenChannelAdapter,实现订阅Topic和绑定消息消费的MessageChannel。 同样由MessageChannel的实例DirectChannel处理消费细节。Channel消息后会发送给我们自定义的MqttInboundMessageHandler实例进行消费。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.