Перейти к содержанию

Dispatcher Module

Dispatcher(router_id=None, storage=MemoryContext, *, use_create_task=False, **storage_kwargs)

Bases: BotMixin

Основной класс для обработки событий бота.

Обеспечивает запуск поллинга и вебхука, маршрутизацию событий, применение middleware, фильтров и вызов соответствующих обработчиков.

Инициализация диспетчера.

Parameters:

Name Type Description Default
router_id str | None

Идентификатор роутера для логов.

None
use_create_task bool

Флаг, отвечающий за параллелизацию обработок событий.

False
storage Any

Класс контекста для хранения данных (MemoryContext, RedisContext и т.д.).

MemoryContext
**storage_kwargs Any

Дополнительные аргументы для инициализации хранилища.

{}
Source code in maxapi/dispatcher.py
def __init__(
    self,
    router_id: str | None = None,
    storage: Any = MemoryContext,
    *,
    use_create_task: bool = False,
    **storage_kwargs: Any,
) -> None:
    """
    Инициализация диспетчера.

    Args:
        router_id: Идентификатор роутера для логов.
        use_create_task: Флаг, отвечающий за параллелизацию
            обработок событий.
        storage: Класс контекста для хранения
            данных (MemoryContext, RedisContext и т.д.).
        **storage_kwargs: Дополнительные аргументы для
            инициализации хранилища.
    """

    self.router_id = router_id
    self.storage = storage
    self.storage_kwargs = storage_kwargs

    self.event_handlers: list[Handler] = []
    self.handlers_by_type: dict[UpdateType, list[Handler]] | None = None
    self.contexts: OrderedDict[
        tuple[int | None, int | None], BaseContext
    ] = OrderedDict()
    self.routers: list[Router | Dispatcher] = []
    self.filters: list[MagicFilter] = []
    self.base_filters: list[BaseFilter] = []
    self.outer_middlewares: list[BaseMiddleware] = []
    self.inner_middlewares: list[BaseMiddleware] = []

    self.bot: Bot | None = None
    self.on_started_func: Callable | None = None
    self.polling = False
    self.use_create_task = use_create_task
    self._cached_router_entries: (
        list[
            tuple[
                Router | Dispatcher,
                list[BaseMiddleware],
                list[MagicFilter],
                list[BaseFilter],
            ]
        ]
        | None
    ) = None
    self._global_mw_chain: HandlerCallable | None = None
    self._background_tasks: set[asyncio.Task] = set()
    self._ready: bool = False

    self.message_created = Event(
        update_type=UpdateType.MESSAGE_CREATED, router=self
    )
    self.bot_added = Event(update_type=UpdateType.BOT_ADDED, router=self)
    self.bot_removed = Event(
        update_type=UpdateType.BOT_REMOVED, router=self
    )
    self.bot_started = Event(
        update_type=UpdateType.BOT_STARTED, router=self
    )
    self.bot_stopped = Event(
        update_type=UpdateType.BOT_STOPPED, router=self
    )
    self.dialog_cleared = Event(
        update_type=UpdateType.DIALOG_CLEARED, router=self
    )
    self.dialog_muted = Event(
        update_type=UpdateType.DIALOG_MUTED, router=self
    )
    self.dialog_unmuted = Event(
        update_type=UpdateType.DIALOG_UNMUTED, router=self
    )
    self.dialog_removed = Event(
        update_type=UpdateType.DIALOG_REMOVED, router=self
    )
    self.raw_api_response = Event(
        update_type=UpdateType.RAW_API_RESPONSE, router=self
    )
    self.chat_title_changed = Event(
        update_type=UpdateType.CHAT_TITLE_CHANGED, router=self
    )
    self.message_callback = Event(
        update_type=UpdateType.MESSAGE_CALLBACK, router=self
    )
    self.message_chat_created = Event(
        update_type=UpdateType.MESSAGE_CHAT_CREATED,
        router=self,
        deprecated=True,
    )
    self.message_edited = Event(
        update_type=UpdateType.MESSAGE_EDITED, router=self
    )
    self.message_removed = Event(
        update_type=UpdateType.MESSAGE_REMOVED, router=self
    )
    self.user_added = Event(update_type=UpdateType.USER_ADDED, router=self)
    self.user_removed = Event(
        update_type=UpdateType.USER_REMOVED, router=self
    )
    self.on_started = Event(update_type=UpdateType.ON_STARTED, router=self)

middlewares property writable

Список outer-middleware.

.. deprecated:: Используйте :attr:outer_middlewares.

check_me() async

Проверяет и логирует информацию о боте.

Source code in maxapi/dispatcher.py
async def check_me(self) -> None:
    """
    Проверяет и логирует информацию о боте.
    """

    bot = self._ensure_bot()
    me = await bot.get_me()

    bot.me = me

    logger_dp.info(
        "Бот: @%s first_name=%s id=%s",
        me.username,
        me.first_name,
        me.user_id,
    )

build_middleware_chain(middlewares, handler) staticmethod

Формирует цепочку вызова middleware вокруг хендлера.

Parameters:

Name Type Description Default
middlewares list[BaseMiddleware]

Список middleware.

required
handler HandlerCallable

Финальный обработчик.

required

Returns:

Name Type Description
Callable HandlerCallable

Обёрнутый обработчик.

Source code in maxapi/dispatcher.py
@staticmethod
def build_middleware_chain(
    middlewares: list[BaseMiddleware],
    handler: HandlerCallable,
) -> HandlerCallable:
    """
    Формирует цепочку вызова middleware вокруг хендлера.

    Args:
        middlewares: Список middleware.
        handler: Финальный обработчик.

    Returns:
        Callable: Обёрнутый обработчик.
    """

    for mw in reversed(middlewares):
        handler = functools.partial(mw, handler)

    return handler

include_routers(*routers)

Добавляет указанные роутеры в диспетчер.

Parameters:

Name Type Description Default
*routers Router

Роутеры для добавления.

()
Source code in maxapi/dispatcher.py
def include_routers(self, *routers: Router) -> None:
    """
    Добавляет указанные роутеры в диспетчер.

    Args:
        *routers: Роутеры для добавления.
    """

    self.routers.extend(routers)

register_outer_middleware(middleware)

Регистрирует outer middleware (до проверки фильтров handler).

Вызывается для каждого подходящего события ещё до того, как диспетчер узнает, какой именно handler сработает.

Порядок регистрации сохраняется: первый зарегистрированный outer middleware выполняется первым (внешний слой цепочки), что симметрично с :meth:register_inner_middleware.

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def register_outer_middleware(self, middleware: BaseMiddleware) -> None:
    """
    Регистрирует outer middleware (до проверки фильтров handler).

    Вызывается для каждого подходящего события ещё до того, как
    диспетчер узнает, какой именно handler сработает.

    Порядок регистрации сохраняется: первый зарегистрированный
    outer middleware выполняется первым (внешний слой цепочки),
    что симметрично с :meth:`register_inner_middleware`.

    Args:
        middleware: Middleware.
    """
    self.outer_middlewares.append(middleware)

register_inner_middleware(middleware)

Регистрирует inner middleware (после проверки фильтров handler).

Вызывается только тогда, когда конкретный handler прошёл все свои фильтры и state и будет реально исполнен. На уровне Dispatcher — только для событий, попавших хоть в один handler; на уровне Router — только для handler этого роутера.

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def register_inner_middleware(self, middleware: BaseMiddleware) -> None:
    """
    Регистрирует inner middleware (после проверки фильтров handler).

    Вызывается только тогда, когда конкретный handler прошёл все
    свои фильтры и state и будет реально исполнен. На уровне
    Dispatcher — только для событий, попавших хоть в один handler;
    на уровне Router — только для handler этого роутера.

    Args:
        middleware (BaseMiddleware): Middleware.
    """
    self.inner_middlewares.append(middleware)

outer_middleware(middleware)

Добавляет Middleware на первое место в списке outer_middlewares.

Историческое поведение: insert(0, ...). В новом :meth:register_outer_middleware порядок изменён на append (register order = execution order), поэтому при миграции проверьте порядок вызовов, если он важен.

.. deprecated:: Используйте :meth:register_outer_middleware.

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def outer_middleware(self, middleware: BaseMiddleware) -> None:
    """
    Добавляет Middleware на первое место в списке outer_middlewares.

    Историческое поведение: ``insert(0, ...)``. В новом
    :meth:`register_outer_middleware` порядок изменён на ``append``
    (register order = execution order), поэтому при миграции
    проверьте порядок вызовов, если он важен.

    .. deprecated::
        Используйте :meth:`register_outer_middleware`.

    Args:
        middleware (BaseMiddleware): Middleware.
    """
    warnings.warn(
        f"{type(self).__name__}.outer_middleware() устарел. "
        "Используйте register_outer_middleware().",
        DeprecationWarning,
        stacklevel=2,
    )
    self.outer_middlewares.insert(0, middleware)

middleware(middleware)

Добавляет Middleware в конец списка.

.. deprecated:: Используйте :meth:register_outer_middleware (текущее поведение — outer, до фильтров handler) или :meth:register_inner_middleware (только когда handler реально вызван).

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def middleware(self, middleware: BaseMiddleware) -> None:
    """
    Добавляет Middleware в конец списка.

    .. deprecated::
        Используйте :meth:`register_outer_middleware` (текущее
        поведение — outer, до фильтров handler) или
        :meth:`register_inner_middleware` (только когда handler
        реально вызван).

    Args:
        middleware: Middleware.
    """
    warnings.warn(
        f"{type(self).__name__}.middleware() устарел. "
        "Используйте register_outer_middleware() (поведение "
        "сохраняется) или register_inner_middleware() для запуска "
        "mw только после выбора handler.",
        DeprecationWarning,
        stacklevel=2,
    )
    self.outer_middlewares.append(middleware)

filter(base_filter)

Добавляет фильтр в список.

Parameters:

Name Type Description Default
base_filter BaseFilter

Фильтр.

required
Source code in maxapi/dispatcher.py
def filter(self, base_filter: BaseFilter) -> None:
    """
    Добавляет фильтр в список.

    Args:
        base_filter: Фильтр.
    """

    self.base_filters.append(base_filter)

__ready(bot) async

Подготавливает диспетчер: сохраняет бота, подготавливает обработчики, вызывает on_started.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
Source code in maxapi/dispatcher.py
async def __ready(self, bot: Bot) -> None:
    """
    Подготавливает диспетчер: сохраняет бота, подготавливает
    обработчики, вызывает on_started.

    Args:
        bot: Экземпляр бота.
    """

    if self._ready:
        return

    self.bot = bot
    self.bot.dispatcher = self

    if self.polling and bot.auto_check_subscriptions:
        await self._check_subscriptions(bot)

    await self.check_me()

    if self not in self.routers:
        self.routers.append(self)
    self._prepare_handlers(bot)

    self._global_mw_chain = self.build_middleware_chain(
        self.outer_middlewares, self._process_event
    )

    if self.on_started_func:
        await self.on_started_func()

    self._ready = True

__get_context(chat_id, user_id)

Возвращает существующий или создаёт новый контекст по chat_id и user_id.

Parameters:

Name Type Description Default
chat_id int | None

Идентификатор чата.

required
user_id int | None

Идентификатор пользователя.

required

Returns:

Name Type Description
BaseContext BaseContext

Контекст.

Source code in maxapi/dispatcher.py
def __get_context(
    self, chat_id: int | None, user_id: int | None
) -> BaseContext:
    """
    Возвращает существующий или создаёт новый контекст
    по chat_id и user_id.

    Args:
        chat_id: Идентификатор чата.
        user_id: Идентификатор пользователя.

    Returns:
        BaseContext: Контекст.
    """

    key = (chat_id, user_id)
    ctx = self.contexts.get(key)
    if ctx is not None:
        if ctx.is_ttl_expired():
            logger_dp.debug("Истёк TTL контекста %s", key)
            del self.contexts[key]
        else:
            ctx.touch_ttl()
            # Перемещаем в конец, чтобы LRU-вытеснение удаляло
            # самые давно неиспользованные контексты
            self.contexts.move_to_end(key)
            return ctx

    if len(self.contexts) >= CONTEXTS_MAX_SIZE:
        evicted_key = next(iter(self.contexts))
        logger_dp.debug(
            "Вытеснен контекст %s (лимит %d)",
            evicted_key,
            CONTEXTS_MAX_SIZE,
        )
        self.contexts.popitem(last=False)

    new_ctx = self.storage(chat_id, user_id, **self.storage_kwargs)
    new_ctx.touch_ttl()
    self.contexts[key] = new_ctx
    return new_ctx

call_handler(handler, event_object, data) async staticmethod

Вызывает хендлер с нужными аргументами.

Перед вызовом фильтрует data, оставляя только те ключи, которые handler реально принимает (по handler.func_args или параметрам, полученным через :func:inspect.signature). В отличие от get_annotations, signature не включает "return" и не требует eval строковых аннотаций — безопасен при from __future__ import annotations. Несовместимые ключи не дойдут до handler и не приведут к TypeError.

Parameters:

Name Type Description Default
handler Handler

Handler.

required
event_object UpdateUnion | dict[str, Any]

Объект события.

required
data dict[str, Any]

Данные, накопленные фильтрами и middleware.

required

Returns:

Type Description
None

None

Source code in maxapi/dispatcher.py
@staticmethod
async def call_handler(
    handler: Handler,
    event_object: UpdateUnion | dict[str, Any],
    data: dict[str, Any],
) -> None:
    """
    Вызывает хендлер с нужными аргументами.

    Перед вызовом фильтрует ``data``, оставляя только те ключи,
    которые handler реально принимает (по ``handler.func_args`` или
    параметрам, полученным через :func:`inspect.signature`).
    В отличие от ``get_annotations``, ``signature`` не включает
    ``"return"`` и не требует eval строковых аннотаций — безопасен
    при ``from __future__ import annotations``. Несовместимые ключи
    не дойдут до handler и не приведут к ``TypeError``.

    Args:
        handler: Handler.
        event_object: Объект события.
        data: Данные, накопленные фильтрами и middleware.

    Returns:
        None
    """
    if data:
        func_args = handler.func_args or frozenset(
            inspect.signature(handler.func_event).parameters,
        )
        kwargs = {k: v for k, v in data.items() if k in func_args}
        if kwargs:
            await handler.func_event(event_object, **kwargs)
            return

    await handler.func_event(event_object)

process_base_filters(event, filters, data=None) async staticmethod

Асинхронно применяет фильтры к событию.

Parameters:

Name Type Description Default
event UpdateUnion

Событие.

required
filters list[BaseFilter]

Список фильтров.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: Словарь с результатом или None, если фильтр не прошёл.

Source code in maxapi/dispatcher.py
@staticmethod
async def process_base_filters(
    event: UpdateUnion,
    filters: list[BaseFilter],
    data: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
    """
    Асинхронно применяет фильтры к событию.

    Args:
        event: Событие.
        filters: Список фильтров.

    Returns:
        dict[str, Any] | None: Словарь с результатом или None,
            если фильтр не прошёл.
    """

    available_data: dict[str, Any] = dict(data or {})
    filter_data: dict[str, Any] = {}

    for _filter in filters:
        kwargs = Dispatcher._resolve_filter_kwargs(_filter, available_data)
        result = await _filter(event, **kwargs)

        if isinstance(result, dict):
            filter_data.update(result)
            available_data.update(result)

        elif not result:
            return None

    return filter_data

handle_raw_response(event_type, raw_data) async

Специальный метод для обработки сырых ответов API.

Source code in maxapi/dispatcher.py
async def handle_raw_response(
    self, event_type: UpdateType, raw_data: dict[str, Any]
) -> None:
    """
    Специальный метод для обработки сырых ответов API.
    """
    entries = (
        self._cached_router_entries
        if self._cached_router_entries is not None
        else self._iter_unique_routers(self.routers)
    )
    for router, *_ in entries:
        matching_handlers = self._find_matching_handlers(
            router=router,
            event_type=event_type,
        )
        for handler in matching_handlers:
            try:
                await self.call_handler(
                    handler=handler,
                    event_object=raw_data,
                    data={},
                )
            except Exception as e:  # noqa: PERF203
                logger_dp.exception(
                    "Ошибка в обработчике RAW_API_RESPONSE: %r", e
                )

handle(event_object) async

Основной обработчик события. Применяет фильтры, middleware и вызывает нужный handler.

Parameters:

Name Type Description Default
event_object UpdateUnion

Событие.

required
Source code in maxapi/dispatcher.py
async def handle(self, event_object: UpdateUnion) -> None:
    """
    Основной обработчик события. Применяет фильтры, middleware
    и вызывает нужный handler.

    Args:
        event_object: Событие.
    """
    router_id = None
    process_info = "нет данных"

    try:
        ids = event_object.get_ids()
        memory_context = self.__get_context(*ids)
        current_state = await memory_context.get_state()

        process_info = (
            f"{event_object.update_type} | "
            f"chat_id: {ids[0]}, user_id: {ids[1]}"
        )

        kwargs: dict[str, Any] = {
            "context": memory_context,
            "raw_state": current_state,
            "_memory_context": memory_context,
            "_current_state": current_state,
            "_process_info": process_info,
        }

        global_chain = (
            self._global_mw_chain
            or self.build_middleware_chain(
                self.outer_middlewares, self._process_event
            )
        )

        try:
            await global_chain(event_object, kwargs)
        except Exception as e:
            mem_data = await memory_context.get_data()

            raise MiddlewareException(
                middleware_title=self._get_middleware_title(global_chain),
                router_id=kwargs.get("_router_id", router_id),
                process_info=process_info,
                memory_context={
                    "data": mem_data,
                    "state": current_state,
                },
                cause=e,
            ) from e

        router_id = kwargs.get("_router_id")
        is_handled = kwargs.get("_is_handled", False)

        if not is_handled:
            logger_dp.info(
                "Проигнорировано: router_id: %s | %s",
                router_id,
                process_info,
            )

    except Exception as e:
        logger_dp.exception(
            "Ошибка при обработке события: router_id: %s | %s | %r",
            router_id,
            process_info,
            e,
        )

start_polling(bot, *, skip_updates=False) async

Запускает цикл получения обновлений (long polling).

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
skip_updates bool

Флаг, отвечающий за обработку старых событий.

False
Source code in maxapi/dispatcher.py
async def start_polling(
    self, bot: Bot, *, skip_updates: bool = False
) -> None:
    """
    Запускает цикл получения обновлений (long polling).

    Args:
        bot: Экземпляр бота.
        skip_updates: Флаг, отвечающий за обработку старых событий.
    """
    self.polling = True

    await self.__ready(bot)

    current_timestamp = to_ms(datetime.now())

    while self.polling:
        events = await self._fetch_updates_once(bot)
        if events is None:
            continue
        await self._dispatch_fetched_events(
            events, current_timestamp, skip_updates=skip_updates
        )

stop_polling() async

Останавливает цикл получения обновлений (long polling).

Дожидается завершения всех фоновых задач (use_create_task=True), запущенных до момента остановки.

Source code in maxapi/dispatcher.py
async def stop_polling(self) -> None:
    """
    Останавливает цикл получения обновлений (long polling).

    Дожидается завершения всех фоновых задач (use_create_task=True),
    запущенных до момента остановки.
    """
    if self.polling:
        self.polling = False
        self._ready = False
        logger_dp.info("Polling остановлен")

    if self._background_tasks:
        logger_dp.info(
            "Ожидаю завершения %d фоновых задач...",
            len(self._background_tasks),
        )
        await asyncio.gather(
            *self._background_tasks, return_exceptions=True
        )
        logger_dp.info("Все фоновые задачи завершены")

startup(bot) async

Инициализирует диспетчер: сохраняет бота, подготавливает обработчики и вызывает on_started.

Используется интеграционными модулями (например, maxapi.webhook.fastapi) для инициализации в lifespan веб-фреймворка.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
Source code in maxapi/dispatcher.py
async def startup(self, bot: Bot) -> None:
    """
    Инициализирует диспетчер: сохраняет бота, подготавливает
    обработчики и вызывает on_started.

    Используется интеграционными модулями (например,
    maxapi.webhook.fastapi) для инициализации в lifespan
    веб-фреймворка.

    Args:
        bot: Экземпляр бота.
    """
    await self.__ready(bot)

handle_webhook(bot, *, host=DEFAULT_HOST, port=DEFAULT_PORT, path=DEFAULT_PATH, secret=None, webhook_type=AiohttpMaxWebhook, **kwargs) async

Запускает вебхук-сервер (aiohttp) для приёма обновлений.

Удобный метод «всё в одном»: создаёт aiohttp-приложение через :class:~maxapi.webhook.aiohttp.BaseMaxWebhook, регистрирует маршрут и запускает сервер.

Для более гибкого управления жизненным циклом сервера используйте одну из реализаций BaseMaxWebhook напрямую, например :class:~maxapi.webhook.aiohttp.BaseMaxWebhook.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
host str

Хост сервера (по умолчанию "0.0.0.0").

DEFAULT_HOST
port int

Порт сервера (по умолчанию 8080).

DEFAULT_PORT
path str

URL-путь для маршрута вебхука.

DEFAULT_PATH
secret str | None

Секрет для проверки заголовка X-Max-Bot-Api-Secret. Должен совпадать со значением, переданным в :meth:~maxapi.Bot.subscribe_webhook.

None
webhook_type type[BaseMaxWebhook]

Класс вебхука.

AiohttpMaxWebhook
**kwargs Any

Дополнительные аргументы для aiohttp.web.AppRunner.

{}
Source code in maxapi/dispatcher.py
async def handle_webhook(
    self,
    bot: Bot,
    *,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    path: str = DEFAULT_PATH,
    secret: str | None = None,
    webhook_type: type[BaseMaxWebhook] = AiohttpMaxWebhook,
    **kwargs: Any,
) -> None:
    """
    Запускает вебхук-сервер (aiohttp) для приёма обновлений.

    Удобный метод «всё в одном»: создаёт aiohttp-приложение через
    :class:`~maxapi.webhook.aiohttp.BaseMaxWebhook`,
    регистрирует маршрут и запускает сервер.

    Для более гибкого управления жизненным циклом сервера используйте
    одну из реализаций BaseMaxWebhook напрямую, например
    :class:`~maxapi.webhook.aiohttp.BaseMaxWebhook`.

    Args:
        bot: Экземпляр бота.
        host: Хост сервера (по умолчанию ``"0.0.0.0"``).
        port: Порт сервера (по умолчанию ``8080``).
        path: URL-путь для маршрута вебхука.
        secret: Секрет для проверки заголовка
            ``X-Max-Bot-Api-Secret``. Должен совпадать со значением,
            переданным в :meth:`~maxapi.Bot.subscribe_webhook`.
        webhook_type: Класс вебхука.
        **kwargs: Дополнительные аргументы для ``aiohttp.web.AppRunner``.
    """
    webhook = webhook_type(dp=self, bot=bot, secret=secret)
    await webhook.run(host=host, port=port, path=path, **kwargs)

init_serve(bot, host=DEFAULT_HOST, port=DEFAULT_PORT, **kwargs) async

.. deprecated:: Используйте :meth:handle_webhook вместо init_serve. Метод будет удалён в одной из следующих версий.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
host str

Хост.

DEFAULT_HOST
port int

Порт.

DEFAULT_PORT
Source code in maxapi/dispatcher.py
async def init_serve(  # pragma: no cover
    self,
    bot: Bot,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    **kwargs: Any,
) -> None:
    """
    .. deprecated::
        Используйте :meth:`handle_webhook` вместо ``init_serve``.
        Метод будет удалён в одной из следующих версий.

    Args:
        bot: Экземпляр бота.
        host: Хост.
        port: Порт.
    """
    warn(
        "init_serve устарел и будет удалён в следующих версиях. "
        "Используйте handle_webhook вместо него.",
        DeprecationWarning,
        stacklevel=2,
    )
    await self.handle_webhook(bot, host=host, port=port, **kwargs)

Router(router_id=None)

Bases: Dispatcher

Роутер для группировки обработчиков событий.

Инициализация роутера.

Parameters:

Name Type Description Default
router_id str | None

Идентификатор роутера для логов.

None
Source code in maxapi/dispatcher.py
def __init__(self, router_id: str | None = None):
    """
    Инициализация роутера.

    Args:
        router_id: Идентификатор роутера для логов.
    """

    super().__init__(router_id)

Event(update_type, router, *, deprecated=False)

Декоратор для регистрации обработчиков событий.

Инициализирует событие-декоратор.

Parameters:

Name Type Description Default
update_type UpdateType

Тип события.

required
router Dispatcher | Router

Экземпляр роутера или диспетчера.

required
deprecated bool

Флаг, указывающий на то, что событие устарело.

False
Source code in maxapi/dispatcher.py
def __init__(
    self,
    update_type: UpdateType,
    router: Dispatcher | Router,
    *,
    deprecated: bool = False,
):
    """
    Инициализирует событие-декоратор.

    Args:
        update_type: Тип события.
        router: Экземпляр роутера или диспетчера.
        deprecated: Флаг, указывающий на то, что событие устарело.
    """

    self.update_type = update_type
    self.router = router
    self.deprecated = deprecated

register(func_event, *args, **kwargs)

Регистрирует функцию как обработчик события.

Parameters:

Name Type Description Default
func_event Callable

Функция-обработчик

required
*args Any

Фильтры

()
**kwargs Any

Дополнительные параметры (например, states)

{}

Returns:

Name Type Description
Callable Callable

Исходная функция.

Source code in maxapi/dispatcher.py
def register(
    self, func_event: Callable, *args: Any, **kwargs: Any
) -> Callable:
    """
    Регистрирует функцию как обработчик события.

    Args:
        func_event: Функция-обработчик
        *args: Фильтры
        **kwargs: Дополнительные параметры (например, states)

    Returns:
        Callable: Исходная функция.
    """

    if self.deprecated:
        warnings.warn(
            f"Событие {self.update_type} устарело "
            f"и будет удалено в будущих версиях.",
            DeprecationWarning,
            stacklevel=3,
        )

    if self.update_type == UpdateType.ON_STARTED:
        self.router.on_started_func = func_event

    else:
        self.router.event_handlers.append(
            Handler(
                *args,
                func_event=func_event,
                update_type=self.update_type,
                **kwargs,
            )
        )
    return func_event

__call__(*args, **kwargs)

Регистрирует функцию как обработчик события через декоратор.

Returns:

Name Type Description
Callable Callable

Декоратор.

Source code in maxapi/dispatcher.py
def __call__(self, *args: Any, **kwargs: Any) -> Callable:
    """
    Регистрирует функцию как обработчик события через декоратор.

    Returns:
        Callable: Декоратор.
    """

    def decorator(func_event: Callable) -> Callable:
        return self.register(func_event, *args, **kwargs)

    return decorator