Go 观察者模式
下面我会一步步说明什么是观察者模式、它的适用场景、几种在 Go 中常见的实现方式(同步、异步、基于 channel、基于事件总线/消息队列、泛型实现),并给出可直接使用的示例代码、并发/资源管理注意事项与常见陷阱及最佳实践。最后给出实战建议和下一步可做的工程化改进方案。
一、概念回顾(目标与角色)
观察者模式(Observer / Pub-Sub)用于在对象状态变化时 自动通知一组感兴趣的对象,从而实现低耦合的发布-订阅机制。
常见角色:
- Subject / Publisher:管理事件并通知订阅者(Observers)。
- Observer / Subscriber:接收事件并处理回调。
- Event:要传递的数据(可以是任意类型)。
目的:当某个对象发生变化时,不需要显式调用所有依赖方的代码,只需发布事件,订阅方被动接收并处理。
二、适用场景(何时使用观察者模式)
- UI 事件(按钮点击、输入变化)——前端/桌面应用。
- 日志/审计:某些操作产生事件,需要异步写日志或审计。
- 缓存失效通知:数据改动后广播给多个缓存节点或模块。
- 领域事件(Domain Events):领域模型发生重要状态变化时广播给其他子系统(发送邮件、统计、异步任务)。
- 插件/扩展系统:允许外部插件订阅核心事件。
- 微服务内的事件总线(进程内)或通过消息队列(跨进程/跨机房)。
- 支付/回调分发:一个支付回调通知多个处理器(订单、对账、营销)处理。
如果你的系统需要“发布事件 → 多个消费者独立处理”,且不想把发布者和消费者强耦合,观察者非常合适。
三、在 Go 中常见实现方式(优缺点对比)
-
简单接口回调(同步)
- 实现简单;处理同步、调用者等待;适合处理开销小且短的回调。
- 缺点:某个 Observer 慢或阻塞会阻塞整个发布者。
-
使用 goroutine + channel(异步)
- 发布者把事件放到 channel,消费者异步读取处理;不阻塞发布者(可控缓冲)。
- 要注意:缓冲、backpressure、goroutine 泄漏、取消机制。
-
事件总线 / Broker(进程内)
- 把管理、路由职责抽象成 EventBus,支持 topic、广播、过滤、优先级、 retry。
- 复杂但功能强:支持订阅、退订、同步/异步、队列化、幂等等。
-
使用消息队列(跨进程/跨机器)
- Kafka/RabbitMQ/NSQ 等,保证跨进程解耦、可靠性、持久化、重试。
- 成本高但适合系统间事件流。
-
Go 泛型(Go 1.18+)实现类型安全的事件总线
- 使用
T any做不同 event 类型的泛型 EventBus,避免接口断言。
- 使用
四、简单实现示例(同步回调 & 并发安全)
1) 同步 Observer(接口回调)
// observer_sync.go
package event
import "sync"
type Event struct {
Name string
Data any
}
type Observer interface {
OnNotify(e Event)
}
type Subject struct {
mu sync.RWMutex
observers map[Observer]struct{}
}
func NewSubject() *Subject {
return &Subject{observers: make(map[Observer]struct{})}
}
func (s *Subject) Subscribe(o Observer) {
s.mu.Lock(); defer s.mu.Unlock()
s.observers[o] = struct{}{}
}
func (s *Subject) Unsubscribe(o Observer) {
s.mu.Lock(); defer s.mu.Unlock()
delete(s.observers, o)
}
func (s *Subject) Publish(e Event) {
s.mu.RLock()
defer s.mu.RUnlock()
for o := range s.observers {
o.OnNotify(e) // 同步调用:慢的 observer 会阻塞发布者
}
}
适用:回调很短且可靠,不需要发布者立即返回。
2) 异步 Observer:用 goroutine 和缓冲 channel(推荐用于 IO/网络处理)
// async_bus.go
package event
import (
"context"
"sync"
)
type Handler func(ctx context.Context, e Event)
type AsyncBus struct {
mu sync.RWMutex
subs map[string][]Handler // topic -> handlers
bufSize int
}
func NewAsyncBus(bufSize int) *AsyncBus {
return &AsyncBus{subs: make(map[string][]Handler), bufSize: bufSize}
}
func (b *AsyncBus) Subscribe(topic string, h Handler) {
b.mu.Lock(); defer b.mu.Unlock()
b.subs[topic] = append(b.subs[topic], h)
}
func (b *AsyncBus) Publish(ctx context.Context, topic string, e Event) {
b.mu.RLock()
handlers := append([]Handler(nil), b.subs[topic]...)
b.mu.RUnlock()
for _, h := range handlers {
// 每个 handler 用独立 goroutine,带上 ctx
go func(h Handler) {
// 注意:ctx 可能已经 cancel,handler 应该注意 ctx.Done()
h(ctx, e)
}(h)
}
}
优点:发布者不阻塞(不用等待),适合异步工作。
风险:如果有大量事件、handler 慢,会产生大量 goroutine;需用 worker pool 或 channel 缓冲和限流控制。
3) 带队列和 worker 的安全实现(防止 goroutine 泄露)
// worker_bus.go (核心思想示例)
type worker struct {
ch chan Event
quit chan struct{}
}
func newWorker(buf int, handler Handler) *worker {
w := &worker{ch: make(chan Event, buf), quit: make(chan struct{})}
go func() {
for {
select {
case e := <-w.ch:
handler(context.Background(), e)
case <-w.quit:
return
}
}
}()
return w
}
func (w *worker) stop() { close(w.quit) }
func (w *worker) push(e Event) {
select {
case w.ch <- e:
default:
// 队列满了,选择丢弃/阻塞/统计/重试
}
}
优点:受控并发、可上报队列满等背压指标。
注意:需要优雅停止(关闭 quit),避免 goroutine 泄露。
4) 泛型类型安全的 EventBus(Go 1.18+)
// generic_bus.go
type Event[T any] struct {
Topic string
Data T
}
type Bus[T any] struct {
mu sync.RWMutex
subs map[string][]func(T)
}
func NewBus[T any]() *Bus[T] { return &Bus[T]{subs: make(map[string][]func(T))} }
func (b *Bus[T]) Subscribe(topic string, h func(T)) {
b.mu.Lock(); defer b.mu.Unlock()
b.subs[topic] = append(b.subs[topic], h)
}
func (b *Bus[T]) Publish(topic string, data T) {
b.mu.RLock()
hs := append([]func(T){}, b.subs[topic]...)
b.mu.RUnlock()
for _, h := range hs {
go h(data) // 异步
}
}
优点:编译时类型安全,避免 interface{} 转换错误。适合事件类型固定的场景。
五、实践中的关键注意点(必读)
- 退订(Unsubscribe)必须可行,避免内存泄露。
- 并发安全:读多写少时用
sync.RWMutex;高并发写时注意竞争。 - Backpressure:异步实现需考虑队列大小、丢弃策略、重试和告警。
- 上下文与取消:给 handler 传
context.Context,支持超时/取消,以便优雅停止。 - 错误处理 & 重试:处理失败时要有重试策略或把失败事件发到死信队列。
- 幂等性:消费端应尽量设计成幂等的,避免重复处理导致副作用。
- 顺序保证:若需要顺序,不能简单地使用并发 goroutine,需用单个 worker 或 partition(如 Kafka partition)。
- 监控和限流:统计队列长度、处理延时、失败率,设置限流或熔断器。
- 优先级:如果事件有优先级,设计多个队列或优先队列。
- 持久化/可靠性:跨进程或关键业务使用消息队列(Kafka/RabbitMQ)而非进程内 EventBus。
六、示例场景:在支付回调中使用观察者
业务:某一笔支付回调到来,要同时做:更新订单、记录日志、发送发票、推送通知、统计。
实现思路:
- 支付回调处理器只负责解析并发布
payment.success事件。 - 多个订阅者分别处理各自职责(orderService、invoiceService、notifyService、metricsService)。
- 若需要保证订单更新与其它处理的一致性:把订单更新放在数据库事务里(同步),而把非关键任务异步下发(事件)。
伪代码:
// 回调 handler
func PaymentNotifyHandler(c *gin.Context) {
trx := parseNotification(c)
// 1) 关键插入/更新:在事务里做(幂等处理)
err := orderRepo.HandlePayment(ctx, trx)
if err != nil { respondFail() }
// 2) 发布事件(非阻塞)
eventBus.Publish(ctx, "payment.success", trx)
respondSuccess()
}
订阅者:
eventBus.Subscribe("payment.success", func(ctx context.Context, trx Payment) {
go invoiceService.CreateInvoice(ctx, trx) // 或直接异步
})
eventBus.Subscribe("payment.success", notifyService.SendPaymentNotification)
eventBus.Subscribe("payment.success", metricsService.Increment)
好处:回调处理器职责单一、系统解耦、可扩展新订阅逻辑无需改回调代码。
七、常见陷阱与如何避免
- 内存泄漏:订阅者不退订 & 长期持有导致内存增长。
→ 方案:支持退订、使用弱引用或定期清理无效订阅。 - goroutine 泄漏:handler 阻塞或未处理退出信号。
→ 方案:handler 使用带超时的 context,使用 worker 池。 - 无序或重复处理:并发导致顺序或幂等问题。
→ 方案:使用 partition/sequence、或在业务端做幂等控制(idempotency key)。 - 发布者阻塞:同步调用时某个 slow observer 导致卡顿。
→ 方案:异步处理、超时、队列或断路器。 - 错误静默:订阅者抛错未记录或未重试。
→ 方案:统一错误日志、死信队列和监控告警。
八、最佳实践清单(Checklist)
- 是否需要跨进程/跨机房?若需要用消息队列。
- 事件处理是否需要保证顺序?若需要,设计顺序策略。
- 是否需要重试与幂等?实现幂等 key 与死信队列。
- 是否需要订阅动态注册/退订?提供 API 并保证并发安全。
- 加入监控(队列长度、处理延迟、失败率)与告警。
- 使用
context.Context传播取消/超时。 - 处理好退订和服务关闭,保证优雅停止(Graceful shutdown)。
九、可直接复用的最小生产-ready 示例(异步 EventBus)
下面是一个简化但较完整的进程内异步 EventBus(支持订阅、退订、缓冲、优雅停止)供你直接使用或改造:
package bus
import (
"context"
"sync"
"time"
)
type Event struct { Topic string; Data any }
type Handler func(ctx context.Context, e Event)
type Subscription struct {
topic string
id uint64
bus *EventBus
}
type EventBus struct {
mu sync.RWMutex
nextID uint64
subs map[string]map[uint64]Handler
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewEventBus() *EventBus {
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
subs: make(map[string]map[uint64]Handler),
ctx: ctx,
cancel: cancel,
}
}
func (b *EventBus) Subscribe(topic string, h Handler) *Subscription {
b.mu.Lock(); defer b.mu.Unlock()
b.nextID++
id := b.nextID
m := b.subs[topic]
if m == nil { m = make(map[uint64]Handler); b.subs[topic] = m }
m[id] = h
return &Subscription{topic: topic, id: id, bus: b}
}
func (s *Subscription) Unsubscribe() {
b := s.bus
b.mu.Lock()
defer b.mu.Unlock()
if m := b.subs[s.topic]; m != nil {
delete(m, s.id)
if len(m) == 0 { delete(b.subs, s.topic) }
}
}
func (b *EventBus) Publish(e Event) {
b.mu.RLock()
handlers := make([]Handler, 0)
if m := b.subs[e.Topic]; m != nil {
for _, h := range m { handlers = append(handlers, h) }
}
b.mu.RUnlock()
for _, h := range handlers {
b.wg.Add(1)
go func(h Handler) {
defer b.wg.Done()
// handler should respect ctx from bus to allow cancellation
h(b.ctx, e)
}(h)
}
}
// Graceful shutdown: stop accepting and wait handlers
func (b *EventBus) Shutdown(timeout time.Duration) {
b.cancel()
ch := make(chan struct{})
go func() {
b.wg.Wait()
close(ch)
}()
select {
case <-ch:
return
case <-time.After(timeout):
return
}
}
使用方法:
bus := NewEventBus()sub := bus.Subscribe("payment", handler)bus.Publish(Event{Topic:"payment", Data:trx})sub.Unsubscribe()bus.Shutdown(5*time.Second)在服务停止时调用。
十、下一步建议(按你场景)
如果你告诉我你的具体场景(例如:支付回调要广播哪些消费者、需求是否要求幂等/顺序/重试、负载大小),我可以:
- 给出最合适的架构建议(进程内 EventBus vs 消息队列)。
- 给出配套实现(含监控、死信队列、重试策略、示例代码)。
- 如果你想,我可以把上面的
EventBus改造成你系统可直接用的模块(含日志、metrics、限流、优雅停机)。
如果你现在想看某种具体实现(同步/异步/泛型/带 worker 的版本)的完整代码或集成示例(比如和 Gin、数据库、支付回调结合),告诉我你的优先级(简洁/高性能/高度可靠),我就直接把合适的代码写出来并解释每一行。