[AEDB] Agents Event Driven Behavior: Part II Event Bus

[AEDB] Agents Event Driven Behavior: Part II Event Bus

#python #ai #agents #NATS #architecture #event bus

Архитектура шины событий

В первой части я кратко изложил концепцию AEDB. Во второй части мы спускаемся на уровень протокола. Здесь мы разбираем центральную нервную систему роя — Event Bus (шину событий), построенную на NATS JetStream, и строгие правила общения агентов.

Шина событий: Выбор технологии и архитектура

Терминология

Поскольку тут вводится несколько новых терминов, необходимо, для начала, синхронизировать словари, чтобы не путаться в абстракциях:

  • Тикет (Ticket) — Это пользовательская задача или интенция, хранящаяся в графе (ArangoDB, но об этом в следующих частях). Тикет создается при каждом взаимодействии с пользователем и вокруг него строятся все ветки стратегий и действий агентов.
  • Событие (Event) — Единица коммуникации. Техническое сообщение, летящее по шине NATS.
  • Топик (Topic) — По сути очередь в NATS. В контексте описываемой архитектуры это классификационный идентификатор события, состоящий из 6 секций формирующихся по набору правил описанных ниже. Агенты подписаны на те топики в шине, которые входят в сферу их компетенций.
  • Сигнал события — Последняя (шестая) секция в семантическом имени (топике) события. Например: .pending — сигнал к исполнению, .done — сигнал готовности выполнения и т.д., сигнализируют о состоянии события.

Почему NATS JetStream?

Проектируя шину для библиотек agentone и graphone, в рамках которых я и реализую описываемую концепцию, я выбирал фундамент исходя из ряда жестких требований:

  • Легковесность vs Монстры: Кажется логичным выбор стандартных брокеров типа Kafka, но она слишком тяжела для роя агентов (JVM, Zookeeper/Kraft). Redis Pub/Sub слишком прост — он не дает гарантий доставки (Fire and Forget). NATS — это идеальный бинарник в пару мегабайт, который не требует кучу ресурсов.
  • Persistent Streams: Поведение в рое имеет множественную вариативность и не хардкодится условными цепочками. Поэтому критически важна персистентность (JetStream) — возможность хранить историю событий и при необходимости перепроигрывать их.
  • Queue Groups (Балансировка без балансировщика): Распределенность роя позволяет размещать инстансы агентов где угодно. Обычные балансировщики требуют знать IP-адреса воркеров, что может быть не всегда для нас доступно (инстанс агента может не иметь своего внешнего адреса). В NATS мы используем Queue Groups: один топик могут слушать десять воркеров, но задачу физически заберет только один — первый освободившийся. Это дает достаточную самобалансировку.
  • Durable Consumers: Агент может быть занят, перезагружаться или иметь проблемы с сетью. Мы не должны терять задачи в такие моменты. Durable Consumer позволяет серверу помнить, на чем остановился конкретный подписчик.

NATS отвечает всем этим требованиям и хорошо представлен в python, потому, выбор сделан.

Топология подключения

Single Stream Pattern: Я использую паттерн Единого потока. Вместо зоопарка стримов, мы создаем один глобальный стрим SWARM. Это агрегатор всех событий роя, упрощающий бэкапы и администрирование. Это в известной степени ограничивает шардирование, но не забываем, что мы строим рой распределенных агентов с упором на локальные модели и узким местом тут будет далеко не шина.

Потому, этим можно пренебречь в пользу управляемости и простоты. Да и я готов ставить доллары против пончиков, что вряд ли автономные рои когда-либо упрутся в пропускную способность шины, при правильной ее организации.

Wildcard Subscriptions: Отдельные операционные агенты ядра (Kernel) могут подписаться на root.> (слушать абсолютно всё для построения графа операций), а конкретный кодер-демон подписывается на узкий топик: root.dev.sandbox.coder.*.pending. Детально фреймворк построения топика разбирается ниже.

Изоляция транспорта: В библиотеке agentone я архитектурно разделяю уровень физического подключения и уровень семантики сообщений.

  • Connection (nc): Это труба. Физический TCP-сокет, TLS-хендшейки, авторизация и реконнекты. Мы поднимаем соединение один раз.
  • Context (js): Это надстройка JetStream. Она отвечает за управление потоком, консюмерами и политиками хранения.

Зачем это нужно? Это позволяет реализовать гибридный паттерн общения через один сокет:

  • Critical Path: Бизнес-задачи для обработки агентами идут через Context. Здесь включены ack (подтверждения), запись на диск и гарантия доставки.
  • Telemetry Path: Метрики, хартбиты и логи на лету могут идти через чистое Connection (Core NATS). Это снижает оверхед: нам не нужно персистить на диск каждый пинг агента. Если пинг потерялся — не страшно, придет следующий.

Такое разделение позволяет агенту быть максимально производительным: использовать JetStream только там, где это реально нужно для бизнеса, а служебный трафик гонять по легковесному протоколу.

Типы и контракты событий

Для того, чтоб стандартизировать систему под любой способ масштабирования, необходимо проработать фреймворк построения и роутинга событий. Я выработал набор простых правил, которые позволяют очень гибко конструировать топики событий под любые их типы и ситуации, а также организовать асинхронное реагирование вокруг статуса события.

Таксономия топиков (6 сегментов)

Семантический роутинг — сердце системы. В классических брокерах роутинг часто скрыт в конфигурациях, но в NATS имя топика (Subject) возведено в абсолют. Агент должен понимать, касается ли его задача, не читая её тело (Payload), а просто анализируя заголовок. Это экономит ресурсы десериализации и позволяет строить очень гибкие подписки.

Я использую строгую 6-сегментную иерархию: root.domain.zone.entity.aspect.signal

Разберем анатомию этого пути:

1. Root & Domain: Пространство имен

root.domain — это глобальный контекст. Рой не живет в вакууме; на одной шине NATS могут сосуществовать и взаимодействовать разные экосистемы. Комбинирование этих двух сегментов позволяет гибко конструировать семантические пространства.

  • Root: Имя системы или кластера (например, blog, swarm, home).
  • Domain: Предметная область внутри системы.
    • swarm.dev: Задачи разработки ПО.
    • swarm.sec: Задачи кибербезопасности.
    • home.iot: Управление умными устройствами дома.
    • farm.iot: Управление умными устройствами на ферме.
    • blog.moderate: Событие на модерацию в блоге.

Зачем это нужно: это позволяет нам четко разделить сигналы по контексту выполнения. Например, в нашей шине могут быть сигналы home.iot.* от датчиков умного дома которые не очень интересны агентам напрямую, но важны для фиксации в бд. Это позволит использовать один поток шины для всех задач роя и развести все их типы в рамках одного стрима. Бонусом это дает очень хорошую семантическую заряженность топику, чтоб упрощает отладку, анализ и наблюдение. Ну и в middleware, например, это дает дополнительные возможности классификации событий.

2. Zone: Зонирование исполнения (Topology & Safety)

zone — это уникальная фишка архитектуры AEDB (во всяком случае я не встречал ни где подобного способа классификации среды обитания агентов). Это маркер среды обитания и контекста безопасности. В распределенном рое не все агенты равны: одни живут в облаке, другие — на домашнем сервере, третьи — в изолированных контейнерах.

  • cluster: Кластер. Зона чистых вычислений, главный сервер\кластер. Здесь живут LLM, логические модули, суммаризаторы текста. Здесь безопасно.
  • uplink: Шлюз. Зона доступа во внешний мир. Агенты здесь имеют выход в интернет (Searcher, OSINT, API Integrator). Это маркер для Firewall: Этим челикам можно наружу.
  • sandbox: Песочница. Зона повышенного риска. Если агент coder генерирует Python-скрипт и выполняет его, он обязан делать это здесь. Песочница — это, например, одноразовый Docker-контейнер без сети или иной способ изоляции предъявляемый требованиями.
  • core: Ядро. Системные процессы управления шиной, базой данных и самим роем.
  • local: Физический хост. Прямой доступ к железу (например, home.iot.local.light.switch.enable).

Зачем это нужно: Когда Планировщик ставит задачу, он явно указывает zone. Это исключает ситуацию, когда потенциально деструктивный код случайно запускается на сервере с продакшн-базой данных. А для пущей уверенности можно настроить обязательную эскалацию на подтверждение пользователем всех задач в определенных зонах. Это в принципе предоставляет гибкий инструмент для реакций и правил для определенных зон исполнения.

Также, нам это очень пригодится, когда наш рой будет сам писать новых агентов и корректировать их системные инструкции. Да-да, не удивляйстесь xD.

При необходимости, могут быть добавлены дополнительные зоны, здесь представлены базовые варианты используемые мной.

3. Entity: Роль исполнителя

entity — это специализация либо уникальное имя агента.

  • Примеры: coder, searcher, writer, camera, router, minerva, coder_01.
  • Именно этот сегмент (по специализации) обычно используется для именования Queue Group. Все экземпляры агента coder подписываются на одну группу, чтобы балансировать нагрузку между собой.
  • Может быть уникальным именем агента, например minerva, coder_rust, coder_01 и т.д. Это позволит, при необходмости, выделить каждому (обычно гетерогенному) агенту в рое свой конкретный топик в рамках общего стрима.

4. Aspect: Навык или Действие

aspect — это конкретная функция или метод внутри роли или гетерогенного агента (minerva.replan). Например, у кодера может быть много специализаций.

  • coder.generation: Сгенерируй новый код.
  • coder.refactoring: Улучши существующий код.
  • coder.review: Найди ошибки.

Зачем это нужно: Это позволяет создавать узкоспециализированных агентов (с гибким управлением промптами). Мы можем запустить легковесного агента, который умеет только review (и потребляет мало памяти), и тяжелого, который умеет generation. Также это позволяет давать модели агента более узкоспециализированные примеры типовых решений в промпте и более подходящую структуру промпта.

5. Signal: Жизненный цикл (Control Flow)

signal — это глагол аспекта (предыдущей секции), указывающий на статус события. В Event-Driven архитектуре это заменяет прямой вызов функций.

  • .pending (Входящий): “Нужно сделать”. Агенты-воркеры слушают именно этот сигнал на своих подписках, чтобы взять задачу.
  • .processing (В процессе): “Я взял, работаю”. Технический сигнал для метрик и отслеживания тайм-аутов операционными агентами.
  • .done (Исходящий): “Готово”. Результат работы. На него подписано Ядро (Kernel), чтобы обновить граф задач. А также операционные агенты контролирующие выполнение задач (например: Планировщик).
  • .failed (Ошибка): “Что-то пошло не так”. На этот сигнал подписан агент-Судья для разбора инцидентов и принятия решения о переопределении задачи или эскалации на пользователя. Например: планировщик ошибочно назначил задачу не тому агенту (поисковику написать код) или дал недостаточно данных то судья может либо направить планировщику событие чтоб он перепланировал или создаст ассистенту событие для эскалации на пользователя с уточняющим вопросом.

Пример полного пути

Представим, что планировщик хочет, чтобы воркер-кодер написал скрипт, и это должно происходить в безопасной среде.

Топик задачи: swarm.dev.sandbox.coder.generation.pending (Рой -> Разработка -> В песочнице -> Кодер -> Генерация -> К исполнению)

Топик ответа: swarm.dev.sandbox.coder.generation.done (Рой -> Разработка -> В песочнице -> Кодер -> Генерация -> Готово)

Спецификация события: CloudEvents + NATS Headers

Тут я сперва попытался изобрести велосипед, но решил от них отказаться в пользу индустриального стандарта CNCF CloudEvents. Анатомия события выглядит так:

  • Envelope (Конверт): Стандартные поля specversion, id, time. Поле type мы мапим 1-в-1 с топиком NATS, а source указывает на URI агента-отправителя (например, /agent/core/minerva).
  • NATS Headers: Критически важные метаданные выносятся в заголовки сообщений NATS для быстрого доступа без десериализации JSON payload. Это ключевая оптимизация для операционных агентов (например, Chronos):
    • Nats-Ticket-Id: Ссылка на сущность Ticket в ArangoDB (об этом в другой части). Любой агент, получивший событие, знает, к какой задаче в операционном графе (об этом в следующей части) привязать результат, не читая тело сообщения. Позволяет связать с какой интенцией пользователя связаны какие графы планов агентов, их шаги и события, критически важно для работы планировщика и observability всей системы.
    • Nats-Trace-Id: Сквозной идентификатор цепочки мыслей. Позволяет собрать лог того, как одна фраза пользователя породила 50 подзадач для 10 агентов.
    • Nats-Timeout: Unixtime (int) значение по наступлению которого нужно что-то предпринять, если никто не взял событие в работу. Chronos читает этот header для мониторинга таймаутов без затрат на парсинг JSON.
    • Nats-Priority: Опционально. Приоритет задачи (critical, high, normal, low) для управления очередностью обработки. По умолчанию normal. Например, события с приоритетами задач для агентов:
      • low: Низкий приоритет, можно взять в работу когда нет других задач.
      • normal: Обычный рабочий приоритет, штатная рабочая задача.
      • high: Высокий приоритет, следующий освободившийся агент должен взять его, даже если у него в очереди еще тысяча обычных задач.
      • critical: Критический приоритет, требующий немедленной реакции. Если все агенты заняты, профильный агент должен высвободиться от текущей задачи наиболее низкого приоритета и переключиться на эту.

Пример полного сообщения в шине (NATS Headers + CloudEvents):

NATS Headers:

Nats-Ticket-Id: ticket-key-12345
Nats-Trace-Id: trace-uuid-888-999
Nats-Timeout: 1738420496
Nats-Priority: normal

Payload (CloudEvents):

{
  "specversion": "1.0",
  "id": "evt-5c7d-4b8e-9f0a",
  "type": "swarm.dev.sandbox.coder.generation.pending",
  "source": "/agent/core/minerva",
  "time": "2026-01-31T12:34:56Z",
  "datacontenttype": "application/json",
  "data": {
    "instruction": "Напиши скрипт для парсинга заголовков с habr.com",
    "constraints": {
        "language": "python",
        "library": "beautifulsoup4",
        "timeout": 30
    },
    "context": [
        "User asked for a news aggregator",
        "Previous attempt failed due to 403 Forbidden"
    ],
    "artifacts": ["r2.host.com/manuals/how_to_parse_enythings.md"]
  }
}

Payload: Модели данных

Внутри поля data лежит полезная нагрузка. Поскольку много данных генерируется агентами, то чтобы не получить кашу, обязательно используем Pydantic для строгой валидации payload.

  • TaskPayload: То, что получает агент. Содержит instruction (промпт), context (история диалога), constraints (ограничения) и artifacts (ссылки на файлы).
  • ResultPayload: Унифицированный ответ. Содержит output (результат), logs (все ризоны (их может быть много) и вызовы функций для отладки хода мыслей агента на лету корректировкой инструкций, можно опускать при успехе, но я бы не стал) и metrics (токены, время и так далее). В целом количество полей в пейлоадах можно увеличивать, но лучше с этим не перебарщивать, чтоб не нагружать канал бесполезной нагрузкой.

Служебные каналы и Observability

Рой — это живой организм, и помимо выполнения бизнес-задач (Тикетов), он должен постоянно сообщать о своем самочувствии. Поскольку мы используем архитектурное разделение Connection (Core NATS) и Context (JetStream), мы можем организовать почти бесплатную доставку телеметрии, не нагружая диск персистентностью.

На текущем этапе я выделяю два типа служебных событий, которые летают мимо основного потока задач (описанного выше), но подчиняющиеся общим правилам CNCF спецификации:

1. Пульс (Heartbeats)

Каждый агент раз в N секунд отправляет короткий сигнал “Я жив”. Это позволяет операционным агентам (например, монитору ресурсов) строить карту живых узлов в реальном времени.

  • Топик: swarm.core.cluster.{agent_id}.status.heartbeat
  • Транспорт: Core NATS (Fire-and-forget). Нам не нужно хранить историю пульса за прошлую неделю, нам важно только состояние “сейчас”.
  • Payload: Содержит текущую нагрузку (CPU/RAM/VRAM/IO-Disk) и статус (idle, busy, — это базовые статусы спецификации AEDB, могут быть расширены энумератором конкретного роя).

2. Распределенное логирование

Агенты не пишут логи в локальные файлы (в контейнерах это бесполезно). Вместо этого они стримят свои stdout/stderr прямо в шину.

  • Топик: swarm.logs.{zone}.{agent_id}.{action}.{level} например: swarm.logs.sandbox.coder.review.error
  • Транспорт: На это событие подписан обычный сервис публикации всей телеметрии в нужную систему (например, в Graylog или LangFuse). Так как рой распределенный, то у агента не должно создаваться дополнительной зависимости от сервиса телеметрии. Ну и бонусом безболезненное (вообще незаметное для роя) изменение/дополнение места сбора телеметрии в случае необходимости.

Такой подход дает полную Observability (наблюдаемость). Мы можем в реальном времени подписаться на swarm.logs.sandbox.coder.*.* и видеть консоль всех кодеров сразу, как в матрице xD. Также это дает мощный инструмент реакций на различные состояния системы, от обычных алертов до автономных попыток роя самостоятельно исправить ситуацию.

Жизненный цикл задачи (Task Lifecycle)

Асинхронный Request-Reply

Главная проблема Event-Driven систем: как реализовать “Запрос-Ответ”, когда никто никого не ждет. Мы делаем это очень просто через смену сигналов в топике.

  1. Phase 1: Dispatch. Kernel (агент планировщик) декомпозирует план, создает запись в ArangoDB и публикует событие с сигналом .pending.
  • Topic: swarm.dev.sandbox.coder.code.pending
  1. Phase 2: Claim. Благодаря Queue Groups, сообщение забирает один свободный демон-кодер подписанный, например, на wildcard-топик swarm.dev.*.coder.*.pending (любые задачи по кодингу в любой среде ожидающие чтоб их кто-то взял). Он шлет брокеру Ack (“Я взял”).
  2. Phase 3: Execution. Демон выполняет работу (например, пишет код) в своей зоне ответственности.
  3. Phase 4: Completion. Демон публикует результат в тот же самый топик, но меняет хвост (сигнал) на .done.
  • Topic: swarm.dev.sandbox.coder.generate.done
  1. Phase 5: Sync. Kernel (планировщик), подписанный на wildcard *.done, ловит ответ, читает header Nats-Ticket-Id для быстрого роутинга и обновляет состояние в графе операций.

Обработка ошибок и Dead Letter Queues

Очевидно что идеального кода не бывает, и агенты падают. Тем более, когда у них очень широкая автономия действий. Потому, нам нужна система, которая будет следить за состоянием выполнения задач и корректно обрабатывать ошибки.

Для этого вводим следующие правила:

  • Timeout: Если агент взял задачу, но умер (не прислал результат), NATS через Ack Wait вернет задачу в очередь другому агенту.
  • Logical Error: Если агент жив, но задача невыполнима (ошибка API, синтаксиса после исчерпанного бюджета попыток исправить и т.д.), он публикует сигнал .failed.
  • Судья: На .failed подписан специальный агент-судья (Nemesis в моем проекте). Он решает: отправить на ретрай или эскалировать проблему обратно планировщику (Minerva в моем проекте) для смены плана, а может на пользователя через агента ассистента.
  • Chronos: Если долгое время (ориентируясь на header Nats-Timeout в событиях) никто не взял событие в работу, то просыпается специальный операционный агент, который всегда подписан на события всех агентов и мониторит состояния выполнения работы. Он читает headers без десериализации payload, что позволяет эффективно сканировать тысячи событий. По наступлению таймаута он принимает решение (об этом в следующих частях) что делать с этим топиком.

Заключение

Постарался максимально развернуто и детально описать архитектуру шины событий в контексте AEDB. Решил, что код писать в постах не буду, а описывать развернуто концепцию каждого компонента. Все концепции реализуются в библиотеках agentone и graphone и в их репозиториях можно посмотреть код.

Если будет спрос (пишите письма), запишу мож ряд роликов с переложением статей в код на примере упомянутых выше пакетов.

В следующей части мы рассмотрим операционный граф и его структуру. А потом перейдем к самому вкусному — планировщику (Minerva) и его графу планов которые он строит вокруг тикета.