0
点赞
收藏
分享

微信扫一扫

flutter mqtt的使用看这里

尤克乔乔 2021-09-21 阅读 196

mqtt网络协议,相信跟物联网相关的公司都会遇到,在Android,iOS原生开发是可以很好的实现,相关的资料也是很多!但是在flutter里面还算比较尝鲜的一个领域吧!

幸亏flutter里面 已经有一个还不错的第三库mqtt_client,我在项目中用的版本是比较低一点的版本,如下图

此版本在项目中,已经运行了有一段时间了,相关的app,也已经上线了,目前没有发现有什么问题.但是,秉承我一贯的原则,干货,干货,干货;当然是要为大家带来最新版本的mqtt的示例啦~~此时应该有掌声

这篇文章介绍 怎么封装一个属于自己的工具,解决在使用过程 中文乱码等问题。

首先我希望用单例模式,这样整个项目随处都可以调用,使用起来比较方便

然后希望支持加密与不加密的情况!

最后希望支持中文

话不多说,新建一个工程flutter_app_mqtt,在pubspec.yaml文件中,添加依赖库mqtt_client,然后pub get一下,下载库

mqtt_client: ^7.3.0

准备工作好了,我们准备封装工具类MqttTool


核心代码

MqttTool工具类代码如下:

import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_buffers.dart';

typedef ConnectedCallback = void Function();

class MqttTool {
  MqttQos qos = MqttQos.atLeastOnce;
  MqttServerClient mqttClient;
  static MqttTool _instance;
  static MqttTool getInstance() {
    if (_instance == null) {
      _instance = MqttTool();
    }
    return _instance;
  }

  Future<MqttClientConnectionStatus> connect(String server, int port,
      String clientIdentifier, String username, String password,
      {bool isSsl = false}) {
    mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);

    mqttClient.onConnected = onConnected;

    mqttClient.onSubscribed = _onSubscribed;

    mqttClient.onSubscribeFail = _onSubscribeFail;

    mqttClient.onUnsubscribed = _onUnSubscribed;

    mqttClient.setProtocolV311();
    mqttClient.logging(on: false);
    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }
    _log("_正在连接中...");
    return mqttClient.connect(username, password);
  }

  disconnect() {
    mqttClient.disconnect();
    _log("_disconnect");
  }

  int publishMessage(String pTopic, String msg) {

    _log("_发送数据-topic:$pTopic,playLoad:$msg");
    Uint8Buffer uint8buffer = Uint8Buffer();
    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(codeUnits);

    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  int publishRawMessage(String pTopic, List<int> list) {
    _log("_发送数据-topic:$pTopic,playLoad:$list");
    Uint8Buffer uint8buffer = Uint8Buffer();
//    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(list);
    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  Subscription subscribeMessage(String subtopic) {
    return mqttClient.subscribe(subtopic, qos);
  }

  unsubscribeMessage(String unSubtopic) {
    mqttClient.unsubscribe(unSubtopic);
  }

  MqttClientConnectionStatus getMqttStatus() {
    return mqttClient.connectionStatus;
  }

  Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
    _log("_监听成功!");
    return mqttClient.updates;
  }

  onConnected() {
//    mqttClient.onConnected = callback;
    _log("_onConnected");
  }

  onDisConnected(ConnectedCallback callback) {
    mqttClient.onDisconnected = callback;
  }

  _onDisconnected() {
    _log("_onDisconnected");
  }

  _onSubscribed(String topic) {
    _log("_订阅主题成功---topic:$topic");
  }

  _onUnSubscribed(String topic) {
    _log("_取消订阅主题成功---topic:$topic");
  }

  _onSubscribeFail(String topic) {
    _log("_onSubscribeFail");
  }

  _log(String msg) {
    print("MQTT-->$msg");
  }
}

工具类封装完成了,现在就需要去项目中应用了,因此,我们改造一下HomePage的布局,来完成测试验证工作

Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

对应的UI图是这样的


RaiseButton对应的点击事件函数如下

//  建立连接
  _connect() async {
    String server = "your server name";
    int port = 1883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-18859995315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt连接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt连接失败 --密码错误!!!");
      } else {
        print("有事做了~ ====mqtt连接失败!!!");
      }
    });
  }

//  订阅主题
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }
  
//  取消订阅
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }
  
//  发布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }
  
//  监听消息的具体实现
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  开启监听消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }
  
//  断开连接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }

点击RaiseButton顺序 connect--->subscribe topic---->start listen---->publish topic----> disconnect

应的控制台log 如下:


至此,Mqtt协议的封装,加应用完成得差不多了.

还有二点需要补充一下,

一个是加密问题,
另一个是 整个工程在监听mqtt回来的数据该如何处理

加密问题,其实MqttTool工具类代码,已经处理好了,相关代码如下

    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }

所以用加密端口时,_connect()方法里面要多传一个参数 isSsl:true

//  建立连接
  _connect() async {
    String server = "connect.owon.com";
    int port = 8883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password,isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt连接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt连接失败 --密码错误!!!");
      } else {
        print("有事做了~ ====mqtt连接失败!!!");
      }
    });
  }

另一个是 整个工程在监听mqtt回来的数据该如何处理
我这边是把mqtt工具类接收到数据用evenbus发送出去了, 其他需要监听数据的page,就去用evenbus去监听接收的数据.

ListEventBus.getDefault().post(p);

evenbus类的代码如下

import 'dart:async';

class ListEventBus {
  static ListEventBus _instance;
  StreamController _streamController;
  factory ListEventBus.getDefault() {
    if (_instance == null) {
      _instance = ListEventBus._init();
    }
    return _instance;
  }

  ListEventBus._init() {
    _streamController = StreamController.broadcast();
  }

  StreamSubscription<T> register<T>(void onData(T event)) {
    ///需要返回订阅者,所以不能使用下面这种形式
//   return _streamController.stream.listen((event) {
//      if (event is T) {
//        onData(event);
//      }
//    });
    ///没有指定类型,全类型注册
    if (T == dynamic) {
      return _streamController.stream.listen(onData);
    } else {
      ///筛选出 类型为 T 的数据,获得只包含T的Stream
      Stream<T> stream =
          _streamController.stream.where((type) => type is T).cast<T>();
      return stream.listen(onData);
    }
  }

  void post(event) {
    _streamController.add(event);
  }

  void unregister() {
    _streamController.close();
  }

  void pause() {
    _streamController.onPause();
  }

  void resume() {
    _streamController.onResume();
  }
}

在需要监听数据的page里面添加以下代码就可以啦

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;
@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];

        });
  }

main.dart文件里面的完整代码

import 'dart:async';
import 'dart:convert';
import 'live_even_bus.dart';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'mqtt_tool.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
        visualDensity: VisualDensity.adaptivePlatformDensity,
      ),
      home: MyHomePage(title: 'Flutter Demo Home Page'),
    );
  }
}

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key, this.title}) : super(key: key);
  final String title;
  @override
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  StreamSubscription<List<MqttReceivedMessage<MqttMessage>>>
      _listenSubscription;

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;

  void _incrementCounter() {
    setState(() {
      _counter++;
    });
  }

@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];
          print("监听的 topc= $topic, payload= $payload");
        });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

//  建立连接
  _connect() async {
    String server = "your server name";
    int port = 8883;
    String clientId = "86-1885999713jlb5f3d01f3";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password, isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt连接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt连接失败 --密码错误!!!");
      } else {
        print("有事做了~ ====mqtt连接失败!!!");
      }
    });
  }

//  订阅主题
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }

//  取消订阅
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }

//  发布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }

//  监听消息的具体实现
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  开启监听消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }

//  断开连接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }


}

结尾

mqtt这个库也是在不断的更新,如果从低版本升到高版本,可能会遇到不兼容的问题,希望到家,冷静沉着应对!祝君好运~ 最后,小伙伴们觉得有点帮助,请帮忙点个赞吧。如果有什么问题需要探讨的,也欢迎留言~

举报

相关推荐

0 条评论