Перейти к содержанию

Dispatcher Module

Dispatcher(router_id=None, use_create_task=False, storage=MemoryContext, **storage_kwargs)

Bases: BotMixin

Основной класс для обработки событий бота.

Обеспечивает запуск поллинга и вебхука, маршрутизацию событий, применение middleware, фильтров и вызов соответствующих обработчиков.

Инициализация диспетчера.

Parameters:

Name Type Description Default
router_id str | None

Идентификатор роутера для логов.

None
use_create_task bool

Флаг, отвечающий за параллелизацию обработок событий.

False
storage type[BaseContext]

Класс контекста для хранения данных (MemoryContext, RedisContext и т.д.).

MemoryContext
**storage_kwargs Any

Дополнительные аргументы для инициализации хранилища.

{}
Source code in maxapi/dispatcher.py
def __init__(
    self,
    router_id: str | None = None,
    use_create_task: bool = False,
    storage: Any = MemoryContext,
    **storage_kwargs: Any,
) -> None:
    """
    Инициализация диспетчера.

    Args:
        router_id (str | None): Идентификатор роутера для логов.
        use_create_task (bool): Флаг, отвечающий за параллелизацию обработок событий.
        storage (type[BaseContext]): Класс контекста для хранения данных (MemoryContext, RedisContext и т.д.).
        **storage_kwargs (Any): Дополнительные аргументы для инициализации хранилища.
    """

    self.router_id = router_id
    self.storage = storage
    self.storage_kwargs = storage_kwargs

    self.event_handlers: List[Handler] = []
    self.contexts: Dict[tuple[int | None, int | None], BaseContext] = {}
    self.routers: List[Router | Dispatcher] = []
    self.filters: List[MagicFilter] = []
    self.base_filters: List[BaseFilter] = []
    self.middlewares: List[BaseMiddleware] = []

    self.bot: Optional[Bot] = None
    self.webhook_app: Optional[FastAPI] = None
    self.on_started_func: Optional[Callable] = None
    self.polling = False
    self.use_create_task = use_create_task

    self.message_created = Event(
        update_type=UpdateType.MESSAGE_CREATED, router=self
    )
    self.bot_added = Event(update_type=UpdateType.BOT_ADDED, router=self)
    self.bot_removed = Event(
        update_type=UpdateType.BOT_REMOVED, router=self
    )
    self.bot_started = Event(
        update_type=UpdateType.BOT_STARTED, router=self
    )
    self.bot_stopped = Event(
        update_type=UpdateType.BOT_STOPPED, router=self
    )
    self.dialog_cleared = Event(
        update_type=UpdateType.DIALOG_CLEARED, router=self
    )
    self.dialog_muted = Event(
        update_type=UpdateType.DIALOG_MUTED, router=self
    )
    self.dialog_unmuted = Event(
        update_type=UpdateType.DIALOG_UNMUTED, router=self
    )
    self.dialog_removed = Event(
        update_type=UpdateType.DIALOG_REMOVED, router=self
    )
    self.raw_api_response = Event(
        update_type=UpdateType.RAW_API_RESPONSE, router=self
    )
    self.chat_title_changed = Event(
        update_type=UpdateType.CHAT_TITLE_CHANGED, router=self
    )
    self.message_callback = Event(
        update_type=UpdateType.MESSAGE_CALLBACK, router=self
    )
    self.message_chat_created = Event(
        update_type=UpdateType.MESSAGE_CHAT_CREATED,
        router=self,
        deprecated=True,
    )
    self.message_edited = Event(
        update_type=UpdateType.MESSAGE_EDITED, router=self
    )
    self.message_removed = Event(
        update_type=UpdateType.MESSAGE_REMOVED, router=self
    )
    self.user_added = Event(update_type=UpdateType.USER_ADDED, router=self)
    self.user_removed = Event(
        update_type=UpdateType.USER_REMOVED, router=self
    )
    self.on_started = Event(update_type=UpdateType.ON_STARTED, router=self)

check_me() async

Проверяет и логирует информацию о боте.

Source code in maxapi/dispatcher.py
async def check_me(self) -> None:
    """
    Проверяет и логирует информацию о боте.
    """

    me = await self._ensure_bot().get_me()

    self._ensure_bot()._me = me

    logger_dp.info(
        f"Бот: @{me.username} first_name={me.first_name} id={me.user_id}"
    )

build_middleware_chain(middlewares, handler)

Формирует цепочку вызова middleware вокруг хендлера.

Parameters:

Name Type Description Default
middlewares List[BaseMiddleware]

Список middleware.

required
handler Callable

Финальный обработчик.

required

Returns:

Name Type Description
Callable Callable[[Any, Dict[str, Any]], Awaitable[Any]]

Обёрнутый обработчик.

Source code in maxapi/dispatcher.py
def build_middleware_chain(
    self,
    middlewares: List[BaseMiddleware],
    handler: Callable[[Any, Dict[str, Any]], Awaitable[Any]],
) -> Callable[[Any, Dict[str, Any]], Awaitable[Any]]:
    """
    Формирует цепочку вызова middleware вокруг хендлера.

    Args:
        middlewares (List[BaseMiddleware]): Список middleware.
        handler (Callable): Финальный обработчик.

    Returns:
        Callable: Обёрнутый обработчик.
    """

    for mw in reversed(middlewares):
        handler = functools.partial(mw, handler)

    return handler

include_routers(*routers)

Добавляет указанные роутеры в диспетчер.

Parameters:

Name Type Description Default
*routers Router

Роутеры для добавления.

()
Source code in maxapi/dispatcher.py
def include_routers(self, *routers: "Router") -> None:
    """
    Добавляет указанные роутеры в диспетчер.

    Args:
        *routers (Router): Роутеры для добавления.
    """

    self.routers += [r for r in routers]

outer_middleware(middleware)

Добавляет Middleware на первое место в списке.

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def outer_middleware(self, middleware: BaseMiddleware) -> None:
    """
    Добавляет Middleware на первое место в списке.

    Args:
        middleware (BaseMiddleware): Middleware.
    """

    self.middlewares.insert(0, middleware)

middleware(middleware)

Добавляет Middleware в конец списка.

Parameters:

Name Type Description Default
middleware BaseMiddleware

Middleware.

required
Source code in maxapi/dispatcher.py
def middleware(self, middleware: BaseMiddleware) -> None:
    """
    Добавляет Middleware в конец списка.

    Args:
        middleware (BaseMiddleware): Middleware.
    """

    self.middlewares.append(middleware)

filter(base_filter)

Добавляет фильтр в список.

Parameters:

Name Type Description Default
base_filter BaseFilter

Фильтр.

required
Source code in maxapi/dispatcher.py
def filter(self, base_filter: BaseFilter) -> None:
    """
    Добавляет фильтр в список.

    Args:
        base_filter (BaseFilter): Фильтр.
    """

    self.base_filters.append(base_filter)

__ready(bot) async

Подготавливает диспетчер: сохраняет бота, регистрирует обработчики, вызывает on_started.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
Source code in maxapi/dispatcher.py
async def __ready(self, bot: Bot) -> None:
    """
    Подготавливает диспетчер: сохраняет бота, регистрирует обработчики, вызывает on_started.

    Args:
        bot (Bot): Экземпляр бота.
    """

    self.bot = bot
    self.bot.dispatcher = self

    if self.polling and self.bot.auto_check_subscriptions:
        response = await self.bot.get_subscriptions()

        if response.subscriptions:
            logger_subscriptions_text = ", ".join(
                [s.url for s in response.subscriptions]
            )
            logger_dp.warning(
                "БОТ ИГНОРИРУЕТ POLLING! Обнаружены установленные подписки: %s",
                logger_subscriptions_text,
            )

    await self.check_me()

    self.routers += [self]

    for router in self.routers:
        router.bot = bot

        for handler in router.event_handlers:
            if handler.base_filters is None:
                continue

            for base_filter in handler.base_filters:
                commands = getattr(base_filter, "commands", None)

                if commands and type(commands) is list:
                    handler_doc = handler.func_event.__doc__
                    extracted_info = None

                    if handler_doc:
                        from_pattern = search(
                            COMMANDS_INFO_PATTERN, handler_doc, DOTALL
                        )
                        if from_pattern:
                            extracted_info = from_pattern.group(1).strip()

                    self.bot.commands.append(
                        CommandsInfo(commands, extracted_info)
                    )

    handlers_count = sum(
        len(router.event_handlers) for router in self.routers
    )

    logger_dp.info(f"{handlers_count} событий на обработку")

    if self.on_started_func:
        await self.on_started_func()

__get_context(chat_id, user_id)

Возвращает существующий или создаёт новый контекст по chat_id и user_id.

Parameters:

Name Type Description Default
chat_id Optional[int]

Идентификатор чата.

required
user_id Optional[int]

Идентификатор пользователя.

required

Returns:

Name Type Description
BaseContext BaseContext

Контекст.

Source code in maxapi/dispatcher.py
def __get_context(
    self, chat_id: Optional[int], user_id: Optional[int]
) -> BaseContext:
    """
    Возвращает существующий или создаёт новый контекст по chat_id и user_id.

    Args:
        chat_id (Optional[int]): Идентификатор чата.
        user_id (Optional[int]): Идентификатор пользователя.

    Returns:
        BaseContext: Контекст.
    """

    key = (chat_id, user_id)
    if key in self.contexts:
        return self.contexts[key]

    new_ctx = self.storage(chat_id, user_id, **self.storage_kwargs)
    self.contexts[key] = new_ctx
    return new_ctx

call_handler(handler, event_object, data) async

Вызывает хендлер с нужными аргументами.

Parameters:

Name Type Description Default
handler Handler

Handler.

required
event_object UpdateType | Dict[str, Any]

Объект события.

required
data Dict[str, Any]

Данные для хендлера.

required

Returns:

Type Description
None

None

Source code in maxapi/dispatcher.py
async def call_handler(
    self,
    handler: Handler,
    event_object: UpdateType | Dict[str, Any],
    data: Dict[str, Any],
) -> None:
    """
    Вызывает хендлер с нужными аргументами.

    Args:
        handler: Handler.
        event_object: Объект события.
        data: Данные для хендлера.

    Returns:
        None
    """

    func_args = handler.func_event.__annotations__.keys()
    kwargs_filtered = {k: v for k, v in data.items() if k in func_args}

    if kwargs_filtered:
        await handler.func_event(event_object, **kwargs_filtered)
    else:
        await handler.func_event(event_object)

process_base_filters(event, filters) async

Асинхронно применяет фильтры к событию.

Parameters:

Name Type Description Default
event UpdateUnion

Событие.

required
filters List[BaseFilter]

Список фильтров.

required

Returns:

Type Description
Optional[Dict[str, Any]] | Literal[False]

Optional[Dict[str, Any]] | Literal[False]: Словарь с результатом или False.

Source code in maxapi/dispatcher.py
async def process_base_filters(
    self, event: UpdateUnion, filters: List[BaseFilter]
) -> Optional[Dict[str, Any]] | Literal[False]:
    """
    Асинхронно применяет фильтры к событию.

    Args:
        event (UpdateUnion): Событие.
        filters (List[BaseFilter]): Список фильтров.

    Returns:
        Optional[Dict[str, Any]] | Literal[False]: Словарь с результатом или False.
    """

    data = {}

    for _filter in filters:
        result = await _filter(event)

        if isinstance(result, dict):
            data.update(result)

        elif not result:
            return result

    return data

handle_raw_response(event_type, raw_data) async

Специальный метод для обработки сырых ответов API.

Source code in maxapi/dispatcher.py
async def handle_raw_response(
    self, event_type: UpdateType, raw_data: Dict[str, Any]
) -> None:
    """
    Специальный метод для обработки сырых ответов API.
    """
    for index, router in enumerate(self.routers):
        matching_handlers = self._find_matching_handlers(
            router, event_type
        )
        for handler in matching_handlers:
            try:
                await self.call_handler(handler, raw_data, {})
            except Exception as e:
                logger_dp.exception(
                    f"Ошибка в обработчике RAW_API_RESPONSE: {e}"
                )

handle(event_object) async

Основной обработчик события. Применяет фильтры, middleware и вызывает нужный handler.

Parameters:

Name Type Description Default
event_object UpdateUnion

Событие.

required
Source code in maxapi/dispatcher.py
async def handle(self, event_object: UpdateUnion) -> None:
    """
    Основной обработчик события. Применяет фильтры, middleware и вызывает нужный handler.

    Args:
        event_object (UpdateUnion): Событие.
    """

    router_id = None
    process_info = "нет данных"

    try:
        ids = event_object.get_ids()
        memory_context = self.__get_context(*ids)
        current_state = await memory_context.get_state()
        kwargs = {"context": memory_context}

        process_info = f"{event_object.update_type} | chat_id: {ids[0]}, user_id: {ids[1]}"

        is_handled = False

        async def _process_event(
            _: UpdateUnion, data: Dict[str, Any]
        ) -> None:
            nonlocal router_id, is_handled, memory_context, current_state

            data["context"] = memory_context

            for index, router in enumerate(self.routers):
                if is_handled:
                    break

                router_id = router.router_id or index

                router_filter_result = await self._check_router_filters(
                    event_object, router
                )

                if router_filter_result is False:
                    continue

                if isinstance(router_filter_result, dict):
                    data.update(router_filter_result)

                matching_handlers = self._find_matching_handlers(
                    router, event_object.update_type
                )

                async def _process_handlers(
                    event: UpdateUnion, handler_data: Dict[str, Any]
                ) -> None:
                    nonlocal is_handled

                    for handler in matching_handlers:
                        handler_match_result = (
                            await self._check_handler_match(
                                handler, event, current_state
                            )
                        )

                        if handler_match_result is False:
                            continue

                        if isinstance(handler_match_result, dict):
                            handler_data.update(handler_match_result)

                        await self._execute_handler(
                            handler=handler,
                            event=event,
                            data=handler_data,
                            handler_middlewares=handler.middlewares,
                            memory_context=memory_context,
                            current_state=current_state,
                            router_id=router_id,
                            process_info=process_info,
                        )

                        logger_dp.info(
                            f"Обработано: router_id: {router_id} | {process_info}"
                        )

                        is_handled = True
                        break

                if isinstance(router, Router) and router.middlewares:
                    router_chain = self.build_middleware_chain(
                        router.middlewares, _process_handlers
                    )
                    await router_chain(event_object, data)
                else:
                    await _process_handlers(event_object, data)

        global_chain = self.build_middleware_chain(
            self.middlewares, _process_event
        )

        try:
            await global_chain(event_object, kwargs)
        except Exception as e:
            mem_data = await memory_context.get_data()

            if hasattr(global_chain, "func"):
                middleware_title = global_chain.func.__class__.__name__  # type: ignore[attr-defined]
            else:
                middleware_title = getattr(
                    global_chain,
                    "__name__",
                    global_chain.__class__.__name__,
                )

            raise MiddlewareException(
                middleware_title=middleware_title,
                router_id=router_id,
                process_info=process_info,
                memory_context={
                    "data": mem_data,
                    "state": current_state,
                },
                cause=e,
            ) from e

        if not is_handled:
            logger_dp.info(
                f"Проигнорировано: router_id: {router_id} | {process_info}"
            )

    except Exception as e:
        logger_dp.exception(
            f"Ошибка при обработке события: router_id: {router_id} | {process_info} | {e} "
        )

start_polling(bot, skip_updates=False) async

Запускает цикл получения обновлений (long polling).

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
skip_updates bool

Флаг, отвечающий за обработку старых событий.

False
Source code in maxapi/dispatcher.py
async def start_polling(
    self, bot: Bot, skip_updates: bool = False
) -> None:
    """
    Запускает цикл получения обновлений (long polling).

    Args:
        bot (Bot): Экземпляр бота.
        skip_updates (bool): Флаг, отвечающий за обработку старых событий.
    """

    self.polling = True

    await self.__ready(bot)

    current_timestamp = to_ms(datetime.now())

    while self.polling:
        try:
            events: Dict = await self._ensure_bot().get_updates(
                marker=self._ensure_bot().marker_updates
            )
        except AsyncioTimeoutError:
            continue
        except (MaxConnection, ClientConnectorError) as e:
            logger_dp.warning(
                f"Ошибка подключения при получении обновлений: {e}, жду {CONNECTION_RETRY_DELAY} секунд"
            )
            await asyncio.sleep(CONNECTION_RETRY_DELAY)
            continue
        except InvalidToken:
            logger_dp.error("Неверный токен! Останавливаю polling")
            self.polling = False
            raise
        except MaxApiError as e:
            logger_dp.info(
                f"Ошибка при получении обновлений: {e}, жду {GET_UPDATES_RETRY_DELAY} секунд"
            )
            await asyncio.sleep(GET_UPDATES_RETRY_DELAY)
            continue
        except Exception as e:
            logger_dp.error(
                f"Неожиданная ошибка при получении обновлений: {e.__class__.__name__}: {e}"
            )
            await asyncio.sleep(GET_UPDATES_RETRY_DELAY)
            continue

        try:
            self._ensure_bot().marker_updates = events.get("marker")

            processed_events = await process_update_request(
                events=events, bot=self._ensure_bot()
            )

            for event in processed_events:
                if skip_updates:
                    if event.timestamp < current_timestamp:
                        logger_dp.info(
                            f"Пропуск события от {from_ms(event.timestamp)}: "
                            f"{event.update_type}"
                        )
                        continue

                if self.use_create_task:
                    asyncio.create_task(self.handle(event))

                else:
                    await self.handle(event)

        except ClientConnectorError:
            logger_dp.error(
                f"Ошибка подключения, жду {CONNECTION_RETRY_DELAY} секунд"
            )
            await asyncio.sleep(CONNECTION_RETRY_DELAY)
        except Exception as e:
            logger_dp.error(
                f"Общая ошибка при обработке событий: {e.__class__} - {e}"
            )

stop_polling() async

Останавливает цикл получения обновлений (long polling).

Этот метод устанавливает флаг polling в False, что приводит к завершению цикла в методе start_polling.

Source code in maxapi/dispatcher.py
async def stop_polling(self) -> None:
    """
    Останавливает цикл получения обновлений (long polling).

    Этот метод устанавливает флаг polling в False, что приводит к
    завершению цикла в методе start_polling.
    """
    if self.polling:
        self.polling = False
        logger_dp.info("Polling остановлен")

handle_webhook(bot, host=DEFAULT_HOST, port=DEFAULT_PORT, **kwargs) async

Запускает FastAPI-приложение для приёма обновлений через вебхук.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
host str

Хост сервера.

DEFAULT_HOST
port int

Порт сервера.

DEFAULT_PORT
Source code in maxapi/dispatcher.py
async def handle_webhook(
    self,
    bot: Bot,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    **kwargs: Any,
) -> None:
    """
    Запускает FastAPI-приложение для приёма обновлений через вебхук.

    Args:
        bot (Bot): Экземпляр бота.
        host (str): Хост сервера.
        port (int): Порт сервера.
    """

    if not FASTAPI_INSTALLED:
        raise ImportError(
            "\n\t Не установлен fastapi!"
            "\n\t Выполните команду для установки fastapi: "
            "\n\t pip install fastapi>=0.68.0"
            "\n\t Или сразу все зависимости для работы вебхука:"
            "\n\t pip install maxapi[webhook]"
        )

    elif not UVICORN_INSTALLED:
        raise ImportError(
            "\n\t Не установлен uvicorn!"
            "\n\t Выполните команду для установки uvicorn: "
            "\n\t pip install uvicorn>=0.15.0"
            "\n\t Или сразу все зависимости для работы вебхука:"
            "\n\t pip install maxapi[webhook]"
        )

    @self.webhook_post("/")
    async def _(request: Request) -> JSONResponse:
        event_json = await request.json()
        event_object = await process_update_webhook(
            event_json=event_json, bot=bot
        )

        if self.use_create_task:
            asyncio.create_task(self.handle(event_object))
        else:
            await self.handle(event_object)

        return JSONResponse(  # pyright: ignore[reportPossiblyUnboundVariable]
            content={"ok": True}, status_code=200
        )

    await self.init_serve(bot=bot, host=host, port=port, **kwargs)

init_serve(bot, host=DEFAULT_HOST, port=DEFAULT_PORT, **kwargs) async

Запускает сервер для обработки вебхуков.

Parameters:

Name Type Description Default
bot Bot

Экземпляр бота.

required
host str

Хост.

DEFAULT_HOST
port int

Порт.

DEFAULT_PORT
Source code in maxapi/dispatcher.py
async def init_serve(
    self,
    bot: Bot,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    **kwargs: Any,
) -> None:
    """
    Запускает сервер для обработки вебхуков.

    Args:
        bot (Bot): Экземпляр бота.
        host (str): Хост.
        port (int): Порт.
    """

    if not UVICORN_INSTALLED:
        raise ImportError(
            "\n\t Не установлен uvicorn!"
            "\n\t Выполните команду для установки uvicorn: "
            "\n\t pip install uvicorn>=0.15.0"
            "\n\t Или сразу все зависимости для работы вебхука:"
            "\n\t pip install maxapi[webhook]"
        )

    if self.webhook_app is None:
        raise RuntimeError("webhook_app не инициализирован")

    config = Config(  # pyright: ignore[reportPossiblyUnboundVariable]
        app=self.webhook_app, host=host, port=port, **kwargs
    )
    server = Server(  # pyright: ignore[reportPossiblyUnboundVariable]
        config
    )

    await self.__ready(bot)

    await server.serve()

Router(router_id=None)

Bases: Dispatcher

Роутер для группировки обработчиков событий.

Инициализация роутера.

Parameters:

Name Type Description Default
router_id str | None

Идентификатор роутера для логов.

None
Source code in maxapi/dispatcher.py
def __init__(self, router_id: str | None = None):
    """
    Инициализация роутера.

    Args:
        router_id (str | None): Идентификатор роутера для логов.
    """

    super().__init__(router_id)

Event(update_type, router, deprecated=False)

Декоратор для регистрации обработчиков событий.

Инициализирует событие-декоратор.

Parameters:

Name Type Description Default
update_type UpdateType

Тип события.

required
router Dispatcher | Router

Экземпляр роутера или диспетчера.

required
deprecated bool

Флаг, указывающий на то, что событие устарело.

False
Source code in maxapi/dispatcher.py
def __init__(
    self,
    update_type: UpdateType,
    router: Dispatcher | Router,
    deprecated: bool = False,
):
    """
    Инициализирует событие-декоратор.

    Args:
        update_type (UpdateType): Тип события.
        router (Dispatcher | Router): Экземпляр роутера или диспетчера.
        deprecated (bool): Флаг, указывающий на то, что событие устарело.
    """

    self.update_type = update_type
    self.router = router
    self.deprecated = deprecated

register(func_event, *args, **kwargs)

Регистрирует функцию как обработчик события.

Parameters:

Name Type Description Default
func_event Callable

Функция-обработчик

required
*args Any

Фильтры

()
**kwargs Any

Дополнительные параметры (например, states)

{}

Returns:

Name Type Description
Callable Callable

Исходная функция.

Source code in maxapi/dispatcher.py
def register(
    self, func_event: Callable, *args: Any, **kwargs: Any
) -> Callable:
    """
    Регистрирует функцию как обработчик события.

    Args:
        func_event (Callable): Функция-обработчик
        *args: Фильтры
        **kwargs: Дополнительные параметры (например, states)

    Returns:
        Callable: Исходная функция.
    """

    if self.deprecated:
        import warnings

        warnings.warn(
            f"Событие {self.update_type} устарело и будет удалено в будущих версиях.",
            DeprecationWarning,
            stacklevel=3,
        )

    if self.update_type == UpdateType.ON_STARTED:
        self.router.on_started_func = func_event

    else:
        self.router.event_handlers.append(
            Handler(
                func_event=func_event,
                update_type=self.update_type,
                *args,
                **kwargs,
            )
        )
    return func_event

__call__(*args, **kwargs)

Регистрирует функцию как обработчик события через декоратор.

Returns:

Name Type Description
Callable Callable

Декоратор.

Source code in maxapi/dispatcher.py
def __call__(self, *args: Any, **kwargs: Any) -> Callable:
    """
    Регистрирует функцию как обработчик события через декоратор.

    Returns:
        Callable: Декоратор.
    """

    def decorator(func_event: Callable) -> Callable:
        return self.register(func_event, *args, **kwargs)

    return decorator