Есть вещи, которые незаметны, пока работают. Программная шина сообщений относится именно к таким. Компоненты общаются между собой, события перетекают из одного модуля в другой, интерфейс реагирует синхронно с бизнес-логикой. Никто не задаётся вопросом, как именно это происходит. Зато когда шина реализована небрежно или заменена прямыми вызовами, картина стремительно меняется. Распухающий список зависимостей, тесты, которые невозможно изолировать, код, который никто не решается трогать.

Custom bus implementation - это конкретное инженерное решение: создать внутри системы управляемый канал передачи сообщений, по правилам которого живут все модули. Когда этот канал спроектирован осознанно, система обретает гибкость. Когда он появляется стихийно, он превращается в источник проблем, которые потом разыскивают неделями.

Почему пользовательская шина нужна там, где уже есть готовые решения

Готовых инструментов хватает. RabbitMQ, Apache Kafka, Redis Streams - зрелые продукты с богатыми экосистемами и тысячами production-часов за плечами. Зачем создавать что-то своё?

Ответ лежит в контексте. Встраиваемые системы, десктопные приложения, игровые движки, модульные GUI-фреймворки - среды, где внешний брокер избыточен. Им достаточно шины, живущей внутри одного процесса, в памяти, без сетевых соединений и без инфраструктурных зависимостей. Вторая причина - контроль. Пользовательская шина позволяет точно определить формат сообщений, порядок доставки, поведение при ошибках и приоритизацию очередей. И третья причина проще: задача не требует распределённой системы, но требует развязки зависимостей. Тогда компактная in-process шина решает её чище, чем любой внешний инструмент.

Архитектурная основа паттерна

Шина событий выступает посредником между источниками событий и их потребителями. Источник, или продюсер, публикует сообщение на шине. Подписавшиеся компоненты-консьюмеры получают уведомления автоматически. В этой схеме три роли: продюсер, консьюмер и сама шина как нейтральный маршрутизатор.

Шину часто путают с паттерном Observer, но разница принципиальная. В Observer подписчики знают об издателе напрямую. Шина же разрывает эту связь полностью: продюсер не знает, кто слушает. Консьюмер не знает, кто публикует. Оба знают только о шине. Это и есть настоящая слабая связанность, ради которой весь механизм и затевается.

Минимальная Python-реализация такой шины выглядит так:

from collections import defaultdict

class EventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_type, handler):
        self._subscribers[event_type].append(handler)

    def publish(self, event_type, data=None):
        for handler in self._subscribers.get(event_type, []):
            handler(data)

bus = EventBus()
bus.subscribe("user_created", lambda d: print(f"New user: {d}"))
bus.publish("user_created", {"id": 42, "name": "Alice"})

Всего около двадцати строк, но принцип передан точно. Продюсер вызывает publish, консьюмер регистрирует handler через subscribe, шина хранит таблицу соответствий. Никаких прямых ссылок между сторонами.

Типизация сообщений и строгость контракта

Одна из первых ошибок при проектировании пользовательской шины - передача сырых словарей или строк в качестве payload. Это удобно в начале и болезненно позже. Когда система разрастается, никто не помнит, какие поля ожидает тот или иной обработчик. Рефакторинг превращается в хождение по минному полю.

Надёжная альтернатива - типизированные объекты событий. В Python для этого отлично подходят dataclasses или Pydantic-модели:

from dataclasses import dataclass

@dataclass
class UserCreatedEvent:
    user_id: int
    username: str
    email: str

class TypedEventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_class, handler):
        self._subscribers[event_class].append(handler)

    def publish(self, event):
        for handler in self._subscribers.get(type(event), []):
            handler(event)

bus = TypedEventBus()
bus.subscribe(UserCreatedEvent, lambda e: print(e.username))
bus.publish(UserCreatedEvent(user_id=1, username="bob", email="Адрес электронной почты защищен от спам-ботов. Для просмотра адреса в браузере должен быть включен Javascript."))

Теперь IDE подсказывает поля, статический анализатор ловит несоответствия типов, а любой новый разработчик видит контракт события прямо в его определении. Это не педантизм, а реальная экономия времени на поддержке.

Потокобезопасность и подводные камни многопоточной шины

Однопоточная шина работает предсказуемо. Многопоточная требует внимания. Если несколько потоков одновременно вызывают subscribe и publish, стандартный список подписчиков превращается в источник гонок данных. Результат непредсказуем: от пропущенных событий до segfault в C++.

Первый шаг к потокобезопасной шине - защита реестра подписчиков мьютексом:

import threading

class ThreadSafeEventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)
        self._lock = threading.RLock()

    def subscribe(self, event_type, handler):
        with self._lock:
            self._subscribers[event_type].append(handler)

    def unsubscribe(self, event_type, handler):
        with self._lock:
            self._subscribers[event_type].remove(handler)

    def publish(self, event_type, data=None):
        with self._lock:
            handlers = list(self._subscribers.get(event_type, []))
        for handler in handlers:
            handler(data)

Обратите внимание: список обработчиков копируется перед итерацией. Это защищает от ситуации, когда один из обработчиков в процессе вызова сам отписывается от события. Без этой детали шина падает с ошибкой изменения коллекции во время итерации.

В C++ аналогичная защита строится через std::shared_mutex и std::shared_lock для операций чтения, и std::unique_lock для записи. Это позволяет нескольким потокам одновременно доставлять события, блокируя шину только при изменении реестра подписчиков.

Утечки памяти и управление жизненным циклом подписчиков

Самая частая ошибка в реализациях пользовательских шин - никем не замеченные утечки памяти. Объект подписался на событие, был удалён из системы, но шина всё ещё хранит на него ссылку. Сборщик мусора не может освободить память, потому что ссылка жива. В долго работающих приложениях это приводит к постепенному росту потребления памяти, который сложно диагностировать.

Решение - слабые ссылки через weakref:

import weakref

class WeakEventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_type, handler):
        ref = weakref.ref(handler.__self__) if hasattr(handler, '__self__') else None
        self._subscribers[event_type].append((ref, handler.__func__ if ref else handler))

    def publish(self, event_type, data=None):
        alive = []
        for ref, func in self._subscribers.get(event_type, []):
            if ref is None:
                func(data)
                alive.append((ref, func))
            elif ref() is not None:
                func(ref(), data)
                alive.append((ref, func))
        self._subscribers[event_type] = alive

Логика проста: при публикации шина проверяет, живы ли подписчики. Мёртвые удаляются автоматически. Никаких ручных вызовов unsubscribe в деструкторах, никаких забытых подписок.

Асинхронная шина и работа с event loop

Синхронная доставка событий удобна для простых случаев. Но что если обработчик выполняет тяжёлую операцию - запрос к базе данных, HTTP-вызов, обработку файла? Блокировка продюсера на время работы всех обработчиков недопустима.

Асинхронная шина решает эту проблему. В Python с asyncio это выглядит так:

import asyncio
from collections import defaultdict

class AsyncEventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_type, handler):
        self._subscribers[event_type].append(handler)

    async def publish(self, event_type, data=None):
        tasks = [
            asyncio.create_task(handler(data))
            for handler in self._subscribers.get(event_type, [])
        ]
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

# Использование
bus = AsyncEventBus()

async def on_user_created(data):
    await asyncio.sleep(0.1)  # симуляция I/O
    print(f"Processed: {data}")

async def main():
    bus.subscribe("user_created", on_user_created)
    await bus.publish("user_created", {"id": 1})

asyncio.run(main())

Параметр return_exceptions=True в gather критически важен. Без него одно упавшее исключение в обработчике отменяет доставку всем остальным подписчикам. С ним каждый обработчик получает шанс выполниться независимо от соседей.

Приоритеты, фильтрация и расширенная маршрутизация

Базовая шина доставляет события всем подписчикам в порядке регистрации. Это работает до тех пор, пока порядок не имеет значения. В реальных системах порядок часто критичен: логирование должно выполняться после бизнес-логики, кэш должен инвалидироваться до того, как интерфейс запросит новые данные.

Приоритетная очередь решает эту задачу. Подписчики регистрируются с числовым приоритетом, шина сортирует их перед доставкой:

import heapq

class PriorityEventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_type, handler, priority=0):
        heapq.heappush(
            self._subscribers[event_type],
            (priority, id(handler), handler)
        )

    def publish(self, event_type, data=None):
        for _, _, handler in sorted(self._subscribers.get(event_type, [])):
            handler(data)

Чем меньше число, тем выше приоритет. Обработчик с priority=0 выполнится раньше обработчика с priority=10. Идентификатор id(handler) добавляется как второй ключ сортировки, чтобы избежать ошибки сравнения объектов при одинаковом приоритете.

Фильтрация событий по содержимому payload открывает ещё один уровень гибкости. Подписчик может указать не только тип события, но и предикат: вызывать обработчик только если user_id > 100 или только если status == "active". Это избавляет от проверок внутри самого обработчика и делает намерение подписчика явным прямо в точке регистрации.

Тестирование шины и изоляция компонентов

Один из главных аргументов в пользу пользовательской шины - тестируемость. Модуль, принимающий шину как зависимость, легко тестировать с подменой: вместо реальной шины подаётся тестовый дублёр, который фиксирует все опубликованные события.

class SpyEventBus:
    def __init__(self):
        self.published = []

    def subscribe(self, event_type, handler):
        pass

    def publish(self, event_type, data=None):
        self.published.append((event_type, data))

# В тесте
def test_user_service_publishes_event():
    spy = SpyEventBus()
    service = UserService(bus=spy)
    service.create_user("alice")
    assert ("user_created", {"username": "alice"}) in spy.published

Никакого реального брокера, никаких сетевых вызовов, никаких побочных эффектов. Тест проверяет ровно одно: что сервис публикует нужное событие при создании пользователя. Вся система вокруг остаётся в стороне.

Это и есть та самая архитектурная ценность, ради которой стоит вкладываться в проектирование шины. Не ради красивых диаграмм и не ради следования трендам, а ради возможности уверенно менять систему, не боясь сломать то, что уже работает. Хорошо спроектированная шина не заметна в ежедневной работе. Но именно она держит код живым спустя годы после первого коммита.