Перейти к содержанию

BaseConnection

BaseConnection()

Bases: BotMixin

Базовый класс для всех методов API.

Содержит общую логику выполнения запроса (сериализация, отправка HTTP-запроса, обработка ответа).

Инициализация BaseConnection.

Атрибуты

bot: Экземпляр бота. session: aiohttp-сессия. after_input_media_delay: Задержка после ввода медиа.

Source code in maxapi/connection/base.py
def __init__(self) -> None:
    """
    Инициализация BaseConnection.

    Атрибуты:
        bot: Экземпляр бота.
        session: aiohttp-сессия.
        after_input_media_delay: Задержка после ввода медиа.
    """

    self.bot: Bot | None = None
    self.session: ClientSession | None = None
    self.after_input_media_delay: float = self.AFTER_MEDIA_INPUT_DELAY
    self.api_url = self.API_URL

set_api_url(url)

Установка API URL для запросов

Parameters:

Name Type Description Default
url str

Новый API URL

required
Source code in maxapi/connection/base.py
def set_api_url(self, url: str) -> None:
    """
    Установка API URL для запросов

    Args:
        url: Новый API URL
    """

    self.api_url = url

request(method, path, model=None, *, is_return_raw=False, **kwargs) async

Выполняет HTTP-запрос к API с автоматическим retry при серверных ошибках.

При получении HTTP-статуса из списка retry_on_statuses (по умолчанию 502, 503, 504) запрос повторяется до max_retries раз с экспоненциальной задержкой.

Parameters:

Name Type Description Default
method HTTPMethod

HTTP-метод (GET, POST и т.д.).

required
path ApiPath | str

Путь до конечной точки.

required
model BaseModel | Any

Pydantic-модель для десериализации ответа, если is_return_raw=False.

None
is_return_raw bool

Если True — вернуть сырой ответ, иначе — результат десериализации.

False
**kwargs Any

Дополнительные параметры (query, headers, json).

{}

Returns:

Type Description
Any | BaseModel

model | dict | Error: Объект модели, dict или ошибка.

Raises:

Type Description
RuntimeError

Если бот не инициализирован.

MaxConnection

Ошибка соединения.

InvalidToken

Ошибка авторизации (401).

MaxApiError

Ошибка API (после исчерпания retry).

Source code in maxapi/connection/base.py
async def request(
    self,
    method: HTTPMethod,
    path: ApiPath | str,
    model: BaseModel | Any = None,
    *,
    is_return_raw: bool = False,
    **kwargs: Any,
) -> Any | BaseModel:
    """
    Выполняет HTTP-запрос к API с автоматическим retry
    при серверных ошибках.

    При получении HTTP-статуса из списка ``retry_on_statuses``
    (по умолчанию 502, 503, 504) запрос повторяется до
    ``max_retries`` раз с экспоненциальной задержкой.

    Args:
        method: HTTP-метод (GET, POST и т.д.).
        path: Путь до конечной точки.
        model: Pydantic-модель для
            десериализации ответа, если is_return_raw=False.
        is_return_raw: Если True — вернуть сырой
            ответ, иначе — результат десериализации.
        **kwargs: Дополнительные параметры (query, headers, json).

    Returns:
        model | dict | Error: Объект модели, dict или ошибка.

    Raises:
        RuntimeError: Если бот не инициализирован.
        MaxConnection: Ошибка соединения.
        InvalidToken: Ошибка авторизации (401).
        MaxApiError: Ошибка API (после исчерпания retry).
    """

    bot = self._ensure_bot()
    conn = bot.default_connection
    retry_statuses = conn.retry_on_statuses

    url = path.value if isinstance(path, ApiPath) else path

    @backoff.on_exception(
        backoff.expo,
        (ClientConnectionError, _RetryableServerError),
        max_tries=conn.max_retries + 1,
        factor=conn.retry_backoff_factor,
        on_backoff=_on_backoff,
    )
    async def _do_request() -> Any:
        session = await bot.ensure_session()
        resp = await session.request(
            method=method.value,
            url=url,
            **kwargs,
        )

        if resp.status == 401:
            await session.close()
            raise InvalidToken("Неверный токен!")

        if resp.status in retry_statuses:
            await resp.read()
            raise _RetryableServerError(resp.status)

        return resp

    try:
        response = await _do_request()
    except ClientConnectionError as e:
        raise MaxConnection(f"Ошибка при отправке запроса: {e}") from e
    except _RetryableServerError as e:
        raise MaxApiError(code=e.status, raw={"error": str(e)}) from e

    if not response.ok:
        raw = await response.json()
        if bot.dispatcher:
            asyncio.create_task(
                bot.dispatcher.handle_raw_response(
                    UpdateType.RAW_API_RESPONSE, raw
                )
            )
        raise MaxApiError(code=response.status, raw=raw)

    raw = await response.json()

    if bot.dispatcher:
        asyncio.create_task(
            bot.dispatcher.handle_raw_response(
                UpdateType.RAW_API_RESPONSE, raw
            )
        )

    if is_return_raw:
        return raw

    model = model(**raw)  # type: ignore

    return bind_bot(model, bot)

upload_file(url, path, type) async

Загружает файл на сервер.

Parameters:

Name Type Description Default
url str

URL загрузки.

required
path str

Путь к файлу.

required
type UploadType

Тип файла.

required

Returns:

Name Type Description
str str

Сырой .text() ответ от сервера.

Source code in maxapi/connection/base.py
async def upload_file(self, url: str, path: str, type: UploadType) -> str:
    """
    Загружает файл на сервер.

    Args:
        url: URL загрузки.
        path: Путь к файлу.
        type: Тип файла.

    Returns:
        str: Сырой .text() ответ от сервера.
    """

    async with aiofiles.open(path, "rb") as f:
        file_data = await f.read()

    path_object = Path(path)
    basename = path_object.name

    form = FormData(quote_fields=False)
    form.add_field(
        name="data",
        value=file_data,
        filename=basename,
        content_type=mimetypes.guess_type(path)[0] or f"{type.value}/*",
    )

    bot = self._ensure_bot()

    session = bot.session
    if session is not None and not session.closed:
        async with session.post(url=url, data=form) as response:
            return await response.text()
    else:
        async with (
            ClientSession(
                timeout=bot.default_connection.timeout
            ) as temp_session,
            temp_session.post(url=url, data=form) as response,
        ):
            return await response.text()

upload_file_buffer(filename, url, buffer, type) async

Загружает файл из буфера.

Parameters:

Name Type Description Default
filename str

Имя файла.

required
url str

URL загрузки.

required
buffer bytes

Буфер данных.

required
type UploadType

Тип файла.

required

Returns:

Name Type Description
str str

Сырой .text() ответ от сервера.

Source code in maxapi/connection/base.py
async def upload_file_buffer(
    self, filename: str, url: str, buffer: bytes, type: UploadType
) -> str:
    """
    Загружает файл из буфера.

    Args:
        filename: Имя файла.
        url: URL загрузки.
        buffer: Буфер данных.
        type: Тип файла.

    Returns:
        str: Сырой .text() ответ от сервера.
    """

    try:
        matches = puremagic.magic_string(buffer[:4096])
        if matches:
            mime_type = matches[0].mime_type
            ext = mimetypes.guess_extension(mime_type) or ""
        else:
            mime_type = f"{type.value}/*"
            ext = ""
    except Exception:
        mime_type = f"{type.value}/*"
        ext = ""

    basename = f"{filename}{ext}"

    form = FormData(quote_fields=False)
    form.add_field(
        name="data",
        value=buffer,
        filename=basename,
        content_type=mime_type,
    )

    bot = self._ensure_bot()

    session = bot.session
    if session is not None and not session.closed:
        async with session.post(url=url, data=form) as response:
            return await response.text()
    else:
        async with (
            ClientSession(
                timeout=bot.default_connection.timeout
            ) as temp_session,
            temp_session.post(url=url, data=form) as response,
        ):
            return await response.text()

download_file(url, destination, *, chunk_size=DOWNLOAD_CHUNK_SIZE) async

Скачивает файл по URL и сохраняет на диск.

Метод работает не через общий request(), поскольку ответом является бинарный поток, а не JSON.

Parameters:

Name Type Description Default
url str

URL файла для скачивания (из payload.url вложения).

required
destination Path | str

Путь к директории для сохранения файла.

required
chunk_size int

Размер чанка при потоковом чтении (по умолчанию 64 КБ).

DOWNLOAD_CHUNK_SIZE

Returns:

Name Type Description
Path Path

Полный путь к скачанному файлу.

Raises:

Type Description
DownloadFileError

При ошибке скачивания.

Source code in maxapi/connection/base.py
async def download_file(
    self,
    url: str,
    destination: Path | str,
    *,
    chunk_size: int = DOWNLOAD_CHUNK_SIZE,
) -> Path:
    """
    Скачивает файл по URL и сохраняет на диск.

    Метод работает не через общий ``request()``, поскольку
    ответом является бинарный поток, а не JSON.

    Args:
        url: URL файла для скачивания (из payload.url вложения).
        destination: Путь к директории для сохранения файла.
        chunk_size: Размер чанка при потоковом чтении
            (по умолчанию 64 КБ).

    Returns:
        Path: Полный путь к скачанному файлу.

    Raises:
        DownloadFileError: При ошибке скачивания.
    """
    bot = self._ensure_bot()
    conn = bot.default_connection

    @backoff.on_exception(
        backoff.expo,
        (ClientConnectionError, _RetryableServerError),
        max_tries=conn.max_retries + 1,
        factor=conn.retry_backoff_factor,
        on_backoff=_on_backoff,
    )
    async def _do_download() -> Any:
        session = await bot.ensure_session()
        resp = await session.request("GET", url)
        if resp.status in conn.retry_on_statuses:
            await resp.read()
            raise _RetryableServerError(resp.status)
        return resp

    try:
        response = await _do_download()
    except ClientConnectionError as e:
        raise DownloadFileError(f"Ошибка при скачивании файла: {e}") from e
    except _RetryableServerError as e:
        raise DownloadFileError(
            f"Ошибка при скачивании файла: HTTP {e.status}"
        ) from e

    if not response.ok:
        raise DownloadFileError(
            f"Ошибка при скачивании файла: HTTP {response.status}"
        )

    cd = response.content_disposition
    if cd and cd.filename:
        filename = Path(cd.filename).name
    else:
        ext = mimetypes.guess_extension(response.content_type or "") or ""
        filename = f"file{ext}"

    dest = Path(destination)
    await aiofiles.os.makedirs(destination, exist_ok=True)
    path = dest / filename

    try:
        async with aiofiles.open(path, "wb") as f:
            async for chunk in response.content.iter_chunked(chunk_size):
                await f.write(chunk)
    finally:
        await response.release()

    return path