Go 观察者模式

Go 观察者模式

下面我会一步步说明什么是观察者模式、它的适用场景、几种在 Go 中常见的实现方式(同步、异步、基于 channel、基于事件总线/消息队列、泛型实现),并给出可直接使用的示例代码、并发/资源管理注意事项与常见陷阱及最佳实践。最后给出实战建议和下一步可做的工程化改进方案。


一、概念回顾(目标与角色)

观察者模式(Observer / Pub-Sub)用于在对象状态变化时 自动通知一组感兴趣的对象,从而实现低耦合的发布-订阅机制。

常见角色:

  • Subject / Publisher:管理事件并通知订阅者(Observers)。
  • Observer / Subscriber:接收事件并处理回调。
  • Event:要传递的数据(可以是任意类型)。

目的:当某个对象发生变化时,不需要显式调用所有依赖方的代码,只需发布事件,订阅方被动接收并处理。


二、适用场景(何时使用观察者模式)

  • UI 事件(按钮点击、输入变化)——前端/桌面应用。
  • 日志/审计:某些操作产生事件,需要异步写日志或审计。
  • 缓存失效通知:数据改动后广播给多个缓存节点或模块。
  • 领域事件(Domain Events):领域模型发生重要状态变化时广播给其他子系统(发送邮件、统计、异步任务)。
  • 插件/扩展系统:允许外部插件订阅核心事件。
  • 微服务内的事件总线(进程内)或通过消息队列(跨进程/跨机房)。
  • 支付/回调分发:一个支付回调通知多个处理器(订单、对账、营销)处理。

如果你的系统需要“发布事件 → 多个消费者独立处理”,且不想把发布者和消费者强耦合,观察者非常合适。


三、在 Go 中常见实现方式(优缺点对比)

  1. 简单接口回调(同步)

    • 实现简单;处理同步、调用者等待;适合处理开销小且短的回调。
    • 缺点:某个 Observer 慢或阻塞会阻塞整个发布者。
  2. 使用 goroutine + channel(异步)

    • 发布者把事件放到 channel,消费者异步读取处理;不阻塞发布者(可控缓冲)。
    • 要注意:缓冲、backpressure、goroutine 泄漏、取消机制。
  3. 事件总线 / Broker(进程内)

    • 把管理、路由职责抽象成 EventBus,支持 topic、广播、过滤、优先级、 retry。
    • 复杂但功能强:支持订阅、退订、同步/异步、队列化、幂等等。
  4. 使用消息队列(跨进程/跨机器)

    • Kafka/RabbitMQ/NSQ 等,保证跨进程解耦、可靠性、持久化、重试。
    • 成本高但适合系统间事件流。
  5. Go 泛型(Go 1.18+)实现类型安全的事件总线

    • 使用 T any 做不同 event 类型的泛型 EventBus,避免接口断言。

四、简单实现示例(同步回调 & 并发安全)

1) 同步 Observer(接口回调)

// observer_sync.go
package event

import "sync"

type Event struct {
    Name string
    Data any
}

type Observer interface {
    OnNotify(e Event)
}

type Subject struct {
    mu        sync.RWMutex
    observers map[Observer]struct{}
}

func NewSubject() *Subject {
    return &Subject{observers: make(map[Observer]struct{})}
}

func (s *Subject) Subscribe(o Observer) {
    s.mu.Lock(); defer s.mu.Unlock()
    s.observers[o] = struct{}{}
}

func (s *Subject) Unsubscribe(o Observer) {
    s.mu.Lock(); defer s.mu.Unlock()
    delete(s.observers, o)
}

func (s *Subject) Publish(e Event) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    for o := range s.observers {
        o.OnNotify(e) // 同步调用:慢的 observer 会阻塞发布者
    }
}

适用:回调很短且可靠,不需要发布者立即返回。


2) 异步 Observer:用 goroutine 和缓冲 channel(推荐用于 IO/网络处理)

// async_bus.go
package event

import (
    "context"
    "sync"
)

type Handler func(ctx context.Context, e Event)

type AsyncBus struct {
    mu      sync.RWMutex
    subs    map[string][]Handler // topic -> handlers
    bufSize int
}

func NewAsyncBus(bufSize int) *AsyncBus {
    return &AsyncBus{subs: make(map[string][]Handler), bufSize: bufSize}
}

func (b *AsyncBus) Subscribe(topic string, h Handler) {
    b.mu.Lock(); defer b.mu.Unlock()
    b.subs[topic] = append(b.subs[topic], h)
}

func (b *AsyncBus) Publish(ctx context.Context, topic string, e Event) {
    b.mu.RLock()
    handlers := append([]Handler(nil), b.subs[topic]...)
    b.mu.RUnlock()

    for _, h := range handlers {
        // 每个 handler 用独立 goroutine,带上 ctx
        go func(h Handler) {
            // 注意:ctx 可能已经 cancel,handler 应该注意 ctx.Done()
            h(ctx, e)
        }(h)
    }
}

优点:发布者不阻塞(不用等待),适合异步工作。
风险:如果有大量事件、handler 慢,会产生大量 goroutine;需用 worker pool 或 channel 缓冲和限流控制。


3) 带队列和 worker 的安全实现(防止 goroutine 泄露)

// worker_bus.go (核心思想示例)
type worker struct {
    ch chan Event
    quit chan struct{}
}

func newWorker(buf int, handler Handler) *worker {
    w := &worker{ch: make(chan Event, buf), quit: make(chan struct{})}
    go func() {
        for {
            select {
            case e := <-w.ch:
                handler(context.Background(), e)
            case <-w.quit:
                return
            }
        }
    }()
    return w
}

func (w *worker) stop() { close(w.quit) }
func (w *worker) push(e Event) {
    select {
    case w.ch <- e:
    default:
        // 队列满了,选择丢弃/阻塞/统计/重试
    }
}

优点:受控并发、可上报队列满等背压指标。
注意:需要优雅停止(关闭 quit),避免 goroutine 泄露。


4) 泛型类型安全的 EventBus(Go 1.18+)

// generic_bus.go
type Event[T any] struct {
    Topic string
    Data  T
}

type Bus[T any] struct {
    mu    sync.RWMutex
    subs  map[string][]func(T)
}

func NewBus[T any]() *Bus[T] { return &Bus[T]{subs: make(map[string][]func(T))} }

func (b *Bus[T]) Subscribe(topic string, h func(T)) {
    b.mu.Lock(); defer b.mu.Unlock()
    b.subs[topic] = append(b.subs[topic], h)
}

func (b *Bus[T]) Publish(topic string, data T) {
    b.mu.RLock()
    hs := append([]func(T){}, b.subs[topic]...)
    b.mu.RUnlock()

    for _, h := range hs {
        go h(data) // 异步
    }
}

优点:编译时类型安全,避免 interface{} 转换错误。适合事件类型固定的场景。


五、实践中的关键注意点(必读)

  1. 退订(Unsubscribe)必须可行,避免内存泄露。
  2. 并发安全:读多写少时用 sync.RWMutex;高并发写时注意竞争。
  3. Backpressure:异步实现需考虑队列大小、丢弃策略、重试和告警。
  4. 上下文与取消:给 handler 传 context.Context,支持超时/取消,以便优雅停止。
  5. 错误处理 & 重试:处理失败时要有重试策略或把失败事件发到死信队列。
  6. 幂等性:消费端应尽量设计成幂等的,避免重复处理导致副作用。
  7. 顺序保证:若需要顺序,不能简单地使用并发 goroutine,需用单个 worker 或 partition(如 Kafka partition)。
  8. 监控和限流:统计队列长度、处理延时、失败率,设置限流或熔断器。
  9. 优先级:如果事件有优先级,设计多个队列或优先队列。
  10. 持久化/可靠性:跨进程或关键业务使用消息队列(Kafka/RabbitMQ)而非进程内 EventBus。

六、示例场景:在支付回调中使用观察者

业务:某一笔支付回调到来,要同时做:更新订单、记录日志、发送发票、推送通知、统计。

实现思路:

  • 支付回调处理器只负责解析并发布 payment.success 事件。
  • 多个订阅者分别处理各自职责(orderService、invoiceService、notifyService、metricsService)。
  • 若需要保证订单更新与其它处理的一致性:把订单更新放在数据库事务里(同步),而把非关键任务异步下发(事件)。

伪代码:

// 回调 handler
func PaymentNotifyHandler(c *gin.Context) {
    trx := parseNotification(c)
    // 1) 关键插入/更新:在事务里做(幂等处理)
    err := orderRepo.HandlePayment(ctx, trx)
    if err != nil { respondFail() }

    // 2) 发布事件(非阻塞)
    eventBus.Publish(ctx, "payment.success", trx)

    respondSuccess()
}

订阅者:

eventBus.Subscribe("payment.success", func(ctx context.Context, trx Payment) {
    go invoiceService.CreateInvoice(ctx, trx) // 或直接异步
})
eventBus.Subscribe("payment.success", notifyService.SendPaymentNotification)
eventBus.Subscribe("payment.success", metricsService.Increment)

好处:回调处理器职责单一、系统解耦、可扩展新订阅逻辑无需改回调代码。


七、常见陷阱与如何避免

  • 内存泄漏:订阅者不退订 & 长期持有导致内存增长。
    → 方案:支持退订、使用弱引用或定期清理无效订阅。
  • goroutine 泄漏:handler 阻塞或未处理退出信号。
    → 方案:handler 使用带超时的 context,使用 worker 池。
  • 无序或重复处理:并发导致顺序或幂等问题。
    → 方案:使用 partition/sequence、或在业务端做幂等控制(idempotency key)。
  • 发布者阻塞:同步调用时某个 slow observer 导致卡顿。
    → 方案:异步处理、超时、队列或断路器。
  • 错误静默:订阅者抛错未记录或未重试。
    → 方案:统一错误日志、死信队列和监控告警。

八、最佳实践清单(Checklist)

  • 是否需要跨进程/跨机房?若需要用消息队列。
  • 事件处理是否需要保证顺序?若需要,设计顺序策略。
  • 是否需要重试与幂等?实现幂等 key 与死信队列。
  • 是否需要订阅动态注册/退订?提供 API 并保证并发安全。
  • 加入监控(队列长度、处理延迟、失败率)与告警。
  • 使用 context.Context 传播取消/超时。
  • 处理好退订和服务关闭,保证优雅停止(Graceful shutdown)。

九、可直接复用的最小生产-ready 示例(异步 EventBus)

下面是一个简化但较完整的进程内异步 EventBus(支持订阅、退订、缓冲、优雅停止)供你直接使用或改造:

package bus

import (
    "context"
    "sync"
    "time"
)

type Event struct { Topic string; Data any }

type Handler func(ctx context.Context, e Event)

type Subscription struct {
    topic string
    id    uint64
    bus   *EventBus
}

type EventBus struct {
    mu      sync.RWMutex
    nextID  uint64
    subs    map[string]map[uint64]Handler
    wg      sync.WaitGroup
    ctx     context.Context
    cancel  context.CancelFunc
}

func NewEventBus() *EventBus {
    ctx, cancel := context.WithCancel(context.Background())
    return &EventBus{
        subs:   make(map[string]map[uint64]Handler),
        ctx:    ctx,
        cancel: cancel,
    }
}

func (b *EventBus) Subscribe(topic string, h Handler) *Subscription {
    b.mu.Lock(); defer b.mu.Unlock()
    b.nextID++
    id := b.nextID
    m := b.subs[topic]
    if m == nil { m = make(map[uint64]Handler); b.subs[topic] = m }
    m[id] = h
    return &Subscription{topic: topic, id: id, bus: b}
}

func (s *Subscription) Unsubscribe() {
    b := s.bus
    b.mu.Lock()
    defer b.mu.Unlock()
    if m := b.subs[s.topic]; m != nil {
        delete(m, s.id)
        if len(m) == 0 { delete(b.subs, s.topic) }
    }
}

func (b *EventBus) Publish(e Event) {
    b.mu.RLock()
    handlers := make([]Handler, 0)
    if m := b.subs[e.Topic]; m != nil {
        for _, h := range m { handlers = append(handlers, h) }
    }
    b.mu.RUnlock()

    for _, h := range handlers {
        b.wg.Add(1)
        go func(h Handler) {
            defer b.wg.Done()
            // handler should respect ctx from bus to allow cancellation
            h(b.ctx, e)
        }(h)
    }
}

// Graceful shutdown: stop accepting and wait handlers
func (b *EventBus) Shutdown(timeout time.Duration) {
    b.cancel()
    ch := make(chan struct{})
    go func() {
        b.wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
        return
    case <-time.After(timeout):
        return
    }
}

使用方法

  • bus := NewEventBus()
  • sub := bus.Subscribe("payment", handler)
  • bus.Publish(Event{Topic:"payment", Data:trx})
  • sub.Unsubscribe()
  • bus.Shutdown(5*time.Second) 在服务停止时调用。

十、下一步建议(按你场景)

如果你告诉我你的具体场景(例如:支付回调要广播哪些消费者、需求是否要求幂等/顺序/重试、负载大小),我可以:

  1. 给出最合适的架构建议(进程内 EventBus vs 消息队列)。
  2. 给出配套实现(含监控、死信队列、重试策略、示例代码)。
  3. 如果你想,我可以把上面的 EventBus 改造成你系统可直接用的模块(含日志、metrics、限流、优雅停机)。

如果你现在想看某种具体实现(同步/异步/泛型/带 worker 的版本)的完整代码或集成示例(比如和 Gin、数据库、支付回调结合),告诉我你的优先级(简洁/高性能/高度可靠),我就直接把合适的代码写出来并解释每一行。

本文为原创内容,作者:闲鹤,原文链接:https://blog.uwenya.cc/1612.html,转载请注明出处。

发表评论