监控易监测对象及指标之:全面监控MongoDB 4数据库

码农K

关注

阅读 38

2024-06-20

前言

村里的老人常说:真男人就该懂得遵守“三不原则”——不主动、不拒绝、不负责

一个复杂的软件系统,其中必然会存在各种各样的“对象”,如果在设计之初没有注意控制好耦合度,导致各个对象甚至是函数之间高度耦合,那对于后期开发和维护将是一个灾难!

在日常开发中,大家不难发现,“面向事件编程”是解耦合的利器,其对应的设计模式便是大家常常会听到的“观察者模式”,而核心思想,就是尽可能令大部分对象都遵守“三不原则”:

  1. 合理设计事件处理器,等待事件的发生,而不要主动轮询某个临界资源;
  2. 设置一个安全高效的事件分发中心,无论多大并发都能保证不拒绝服务
  3. 事件生产者不必关心事件将由谁来处理、如何处理,亦无需对结果负责

接下来我将为大家展示如何设计一个优雅的本地消息分发处理系统。

接口设计

首先我们定义一个通用的 ```Notification``` (也可以叫“事件”,或者“消息”),它包含3个基本信息:

  1. 事件名;
  2. 发送者;
  3. 当前信息(上下文)
///  Notification object with name, sender and extra info
class Notification {
  Notification(this.name, this.sender, this.userInfo);

  final String name;
  final dynamic sender;
  final Map? userInfo;

}

然后我们定义“观察者”接口,以便让任务中心正确分发消息:

///  Notification observer
abstract class Observer {

  Future<void> onReceiveNotification(Notification notification);

}

由于消费者处理该事件时有可能会需要花费较长时间,所以这里我们设计为异步接口。

最后我们再实现一个消息分发中心:

///  Singleton
class NotificationCenter {
  factory NotificationCenter() => _instance;
  static final NotificationCenter _instance = NotificationCenter._internal();
  NotificationCenter._internal();

  BaseCenter center = BaseCenter();

  ///  Add observer with notification name
  ///
  /// @param observer - who will receive notification
  /// @param name     - notification name
  void addObserver(Observer observer, String name) {
    center.addObserver(observer, name);
  }

  ///  Remove observer for notification name
  ///
  /// @param observer - who will receive notification
  /// @param name     - notification name
  void removeObserver(Observer observer, [String? name]) {
    center.removeObserver(observer, name);
  }

  ///  Post a notification with extra info
  ///
  /// @param name   - notification name
  /// @param sender - who post this notification
  /// @param info   - extra info
  Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
    await center.postNotification(name, sender, info);
  }

  ///  Post a notification
  ///
  /// @param notification - notification object
  Future<void> post(Notification notification) async {
    await center.post(notification);
  }

}

这个事件分发中心主要实现3个功能:

  1. 将一个观察者以及其关心的事件名称添加到内部等候队列;
  2. 将一个观察者移出等候队列;
  3. 提交一个事件(中心内部异步执行分发)。

并且,因为一个应用系统中通常应该只有一个事件分发中心,所以这里的 NotificationCenter 被设计成为单例模式。

这样一个通用的本地消息分发系统就设计完成了。

应用示例

接下来将为你展示这个系统如何使用。

首先我们先定义一个观察者,并且将其添加到事件分发中心:

第一步,实现 Observer 接口:

import 'package:lnc/notification.dart' as lnc;


class _ContactListState extends State<ContactListPage> implements lnc.Observer {

  // ...

  @override
  Future<void> onReceiveNotification(lnc.Notification notification) async {
    // 获取事件名称与相关信息
    String name = notification.name;
    Map? userInfo = notification.userInfo;
    // 根据事件名称处理信息
    if (name == 'ContactsUpdated') {
      ID? contact = userInfo?['contact'];
      Log.info('contact updated: $contact');
      // ...
    } else if (name == 'DocumentUpdated') {
      ID? did = userInfo?['ID'];
      Log.info('document updated: $did');
      // ...
    }
  }

}

第二步,在适当的时机将该观察者添加进事件分发中心 or 从中心删除:

class _ContactListState extends State<ContactListPage> implements lnc.Observer {
  _ContactListState() {
    // ...
    var nc = lnc.NotificationCenter();
    nc.addObserver(this, 'ContactsUpdated');
    nc.addObserver(this, 'DocumentUpdated');
  }

  @override
  void dispose() {
    var nc = lnc.NotificationCenter();
    nc.removeObserver(this, 'DocumentUpdated');
    nc.removeObserver(this, 'ContactsUpdated');
    super.dispose();
  }

  // ...

}

第三步,在事件发生点提交事件给分发中心:

    // post notification
    var nc = NotificationCenter();
    nc.postNotification('DocumentUpdated', this, {
      'ID': identifier,
      'document': doc,
    });

至此,一个观察者模式的本地事件系统的应用就介绍完了。

下面我们再来深入一下内部,看看这个事件分发中心是如何实现的?

进阶

注意前面提到的事件分发中心(单例) NotificationCenter,里面有一个代理对象 center:

///  Singleton
class NotificationCenter {
  factory NotificationCenter() => _instance;
  static final NotificationCenter _instance = NotificationCenter._internal();
  NotificationCenter._internal();

  BaseCenter center = BaseCenter();  // 代理对象,内部实现(可替换)

  // ...

}

这里采用代理模式,是为了方便用户根据项目的特殊需要,定义具体的分发逻辑实现以替换之。

下面介绍一下这个默认的分发中心 BaseCenter:

class BaseCenter {

  // name => WeakSet<Observer>
  final Map<String, Set<Observer>> _observers = {};

  ///  Add observer with notification name
  ///
  /// @param observer - listener
  /// @param name     - notification name
  void addObserver(Observer observer, String name) {
    Set<Observer>? listeners = _observers[name];
    if (listeners == null) {
      listeners = WeakSet();  // 弱引用集合
      listeners.add(observer);
      _observers[name] = listeners;
    } else {
      listeners.add(observer);
    }
  }

  ///  Remove observer from notification center
  ///
  /// @param observer - listener
  /// @param name     - notification name
  void removeObserver(Observer observer, [String? name]) {
    if (name == null) {
      // 1. remove from all observer set
      _observers.forEach((key, listeners) {
        listeners.remove(observer);
      });
      // 2. remove empty set
      _observers.removeWhere((key, listeners) => listeners.isEmpty);
    } else {
      // 3. get listeners by name
      Set<Observer>? listeners = _observers[name];
      if (listeners != null && listeners.remove(observer)) {
        // observer removed
        if (listeners.isEmpty) {
          _observers.remove(name);
        }
      }
    }
  }

  ///  Post notification with name
  ///
  /// @param name     - notification name
  /// @param sender   - notification sender
  /// @param info     - extra info
  Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
    return await post(Notification(name, sender, info));
  }

  Future<void> post(Notification notification) async {
    Set<Observer>? listeners = _observers[notification.name]?.toSet();
    if (listeners == null) {
      return;
    }
    List<Future> tasks = [];
    for (Observer item in listeners) {
      tasks.add(item.onReceiveNotification(notification));
    }
    // wait all tasks finished
    await Future.wait(tasks);
  }

}
  1. 首先,它有3个接口函数和 NotificationCenter 一一对应:
  2. 其次,它的内部有一个 key 为字符串的映射对象 _observers,其中每一个事件名称(字符串)映射向一个弱引用的集合 WeakSet,集合中的元素则是关注该事件名称的所有观察者;
  3. 当生产者提交事件时,该中心会根据该事件名称从 _observers 中获取对应的观察者集合,并调用其事件接口函数。

这里有两点值得注意:

  1. 由于低耦合的设计,各个观察者(事件消费者)分别独立处理事件结果,相互之间并无关联,并且也没有前后时序关系的要求,所以这里的 post 函数会采用异步并发的方式来同时调用这些观察者接口;
  2. 一般而言,观察者的添加与移除是一一对应的,但为了防止异常情况发生,这里的观察者集合仍然采用弱引用的集合,以便某些观察者非正常退出时,即使没有显式调用 removeObserver() 函数,也不会造成泄漏。

(关于弱引用的实现我们留到以后再来讲解)

代码引用

由于我已提交了一个完整的模块代码到 pub.dev,所以在实际应用中,你只需要在项目工程文件 ```pubspec.yaml``` 中添加

然后在需要使用的 dart 文件头引入即可:

import 'package:lnc/notification.dart' as lnc;

全部源码

/* license: https://mit-license.org
 *
 *  LNC : Local Notification Center
 *
 *                               Written in 2023 by Moky <albert.moky@gmail.com>
 *
 * =============================================================================
 * The MIT License (MIT)
 *
 * Copyright (c) 2023 Albert Moky
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 * =============================================================================
 */
import 'package:object_key/object_key.dart';
import 'package:lnc/log.dart';


///  Notification observer
abstract class Observer {

  Future<void> onReceiveNotification(Notification notification);
}

///  Notification object with name, sender and extra info
class Notification {
  Notification(this.name, this.sender, this.userInfo);

  final String name;
  final dynamic sender;
  final Map? userInfo;

  @override
  String toString() {
    Type clazz = runtimeType;
    return '<$clazz name="$name">\n\t<sender>$sender</sender>\n'
        '\t<info>$userInfo</info>\n</$clazz>';
  }

}

///  Notification center
class NotificationCenter {
  factory NotificationCenter() => _instance;
  static final NotificationCenter _instance = NotificationCenter._internal();
  NotificationCenter._internal();

  BaseCenter center = BaseCenter();

  ///  Add observer with notification name
  ///
  /// @param observer - who will receive notification
  /// @param name     - notification name
  void addObserver(Observer observer, String name) {
    center.addObserver(observer, name);
  }

  ///  Remove observer for notification name
  ///
  /// @param observer - who will receive notification
  /// @param name     - notification name
  void removeObserver(Observer observer, [String? name]) {
    center.removeObserver(observer, name);
  }

  ///  Post a notification with extra info
  ///
  /// @param name   - notification name
  /// @param sender - who post this notification
  /// @param info   - extra info
  Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
    await center.postNotification(name, sender, info);
  }

  ///  Post a notification
  ///
  /// @param notification - notification object
  Future<void> post(Notification notification) async {
    await center.post(notification);
  }

}

class BaseCenter with Logging {

  // name => WeakSet<Observer>
  final Map<String, Set<Observer>> _observers = {};

  ///  Add observer with notification name
  ///
  /// @param observer - listener
  /// @param name     - notification name
  void addObserver(Observer observer, String name) {
    Set<Observer>? listeners = _observers[name];
    if (listeners == null) {
      listeners = WeakSet();
      listeners.add(observer);
      _observers[name] = listeners;
    } else {
      listeners.add(observer);
    }
  }

  ///  Remove observer from notification center
  ///
  /// @param observer - listener
  /// @param name     - notification name
  void removeObserver(Observer observer, [String? name]) {
    if (name == null) {
      // 1. remove from all observer set
      _observers.forEach((key, listeners) {
        listeners.remove(observer);
      });
      // 2. remove empty set
      _observers.removeWhere((key, listeners) => listeners.isEmpty);
    } else {
      // 3. get listeners by name
      Set<Observer>? listeners = _observers[name];
      if (listeners != null && listeners.remove(observer)) {
        // observer removed
        if (listeners.isEmpty) {
          _observers.remove(name);
        }
      }
    }
  }

  ///  Post notification with name
  ///
  /// @param name     - notification name
  /// @param sender   - notification sender
  /// @param info     - extra info
  Future<void> postNotification(String name, dynamic sender, [Map? info]) async {
    return await post(Notification(name, sender, info));
  }

  Future<void> post(Notification notification) async {
    Set<Observer>? listeners = _observers[notification.name]?.toSet();
    if (listeners == null) {
      logDebug('no listeners for notification: ${notification.name}');
      return;
    }
    List<Future> tasks = [];
    for (Observer item in listeners) {
      try {
        tasks.add(item.onReceiveNotification(notification).onError((error, st) =>
            Log.error('observer error: $error, $st, $notification')
        ));
      } catch (ex, stackTrace) {
        logError('observer error: $ex, $stackTrace, $notification');
      }
    }
    // wait all tasks finished
    await Future.wait(tasks);
  }

}

GitHub 地址:

https://github.com/dimchat/sdk-dart/blob/main/lnc/lib/src/notification.dart

结语

这里展示了一个基由观察者模式设计的本地事件通知分发系统,其中包含了“观察者模式”、“单例模式”、“代理模式”等设计思想,希望对你有帮助。

如有其他问题,可以下载登录 Tarsier​​​​​​​ 与我交流(默认通讯录里找 Albert Moky)

精彩评论(0)

0 0 举报