Source code for usos_api.auth

import urllib

import aiohttp
from oauthlib.oauth1 import Client

from .exceptions import USOSAPIException
from .logger import get_logger

_LOGGER = get_logger("AuthManager")


[docs] class AuthManager: """ A manager for the USOS API authentication. """ REQUEST_TOKEN_SUFFIX = "services/oauth/request_token" AUTHORIZE_SUFFIX = "services/oauth/authorize" ACCESS_TOKEN_SUFFIX = "services/oauth/access_token" REVOKE_TOKEN_SUFFIX = "services/oauth/revoke_token" # List of available scopes can be found at https://apps.usos.edu.pl/developers/api/authorization/#scopes SCOPES = "|".join(["offline_access", "studies"]) def __init__( self, api_base_address: str, consumer_key: str, consumer_secret: str, trust_env: bool = False, ): """ Initialize the authentication manager. :param api_base_address: The base address of the USOS API. :param consumer_key: Consumer key obtained from the USOS API. :param consumer_secret: Consumer secret obtained from the USOS API. :param trust_env: Whether to trust the environment variables for the connection, see https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientSession for more information. """ self.base_address = api_base_address.rstrip("/") + "/" self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.access_token = None self.access_token_secret = None self._session = None self.trust_env = trust_env self._oauth_client = Client(consumer_key, consumer_secret) async def __aenter__(self) -> "AuthManager": """ Enter the manager. :return: The manager. """ await self.open() return self async def __aexit__(self, exc_type, exc_value, traceback): """ Exit the manager. :param exc_type: The exception type. :param exc_value: The exception value. :param traceback: The traceback. """ await self.close()
[docs] async def open(self): """ Open the manager. """ self._session = aiohttp.ClientSession(trust_env=self.trust_env)
[docs] async def close(self): """ Close the manager. """ if self._session: await self._session.close()
async def _generate_request_token(self, callback_url: str) -> None: """ Generate a new request token. :param callback_url: """ url = f"{self.base_address}{self.REQUEST_TOKEN_SUFFIX}" params = { "oauth_callback": callback_url, "scopes": self.SCOPES, } headers = {"Content-Type": "application/x-www-form-urlencoded"} url, headers, body = Client( self.consumer_key, client_secret=self.consumer_secret ).sign( url, http_method="POST", body=params, headers=headers ) # Use a new client to avoid using the access token if it's set async with self._session.post(url, data=body, headers=headers) as response: await self._handle_response_errors(response) data = dict(urllib.parse.parse_qsl(await response.text())) self._request_token = data["oauth_token"] self._request_token_secret = data["oauth_token_secret"] self._oauth_client.resource_owner_key = self._request_token self._oauth_client.resource_owner_secret = self._request_token_secret _LOGGER.debug("New request token generated")
[docs] async def get_authorization_url( self, callback_url: str, confirm_user: bool = False ) -> str: """ Get the authorization URL. :param callback_url: The callback URL. :param confirm_user: Whether to confirm the user. :return: The authorization URL. """ await self._generate_request_token(callback_url) if confirm_user: return f"{self.base_address}{self.AUTHORIZE_SUFFIX}?oauth_token={self._request_token}&interactivity=confirm_user" return f"{self.base_address}{self.AUTHORIZE_SUFFIX}?oauth_token={self._request_token}"
[docs] async def authorize(self, verifier: str, request_token, request_token_secret): """ Authorize the client with verifier and optionally token and token secret. :param verifier: The verifier to authorize the client with. :param token: The OAuth token obtained from the previous step. :param token_secret: The OAuth token secret obtained from the previous step. :return: The access token and secret. """ self._oauth_client.verifier = verifier if request_token: self._oauth_client.resource_owner_key = request_token if request_token_secret: self._oauth_client.resource_owner_secret = request_token_secret url = f"{self.base_address}{self.ACCESS_TOKEN_SUFFIX}" url, headers, body = self._oauth_client.sign(url, http_method="POST") try: async with self._session.post(url, data=body, headers=headers) as response: await self._handle_response_errors(response) data = dict(urllib.parse.parse_qsl(await response.text())) self.load_access_token(data["oauth_token"], data["oauth_token_secret"]) _LOGGER.debug("Authorization successful, received access token") return self.access_token, self.access_token_secret except AttributeError as e: if e.args[0] == "'NoneType' object has no attribute 'post'": raise USOSAPIException( "Authorization failed. Did you forget to open the manager?" ) raise
[docs] def load_access_token(self, access_token: str, access_token_secret: str): """ Load the access token and secret into the manager. :param access_token: The access token. :param access_token_secret: The access token secret. """ self.access_token = access_token self.access_token_secret = access_token_secret self._oauth_client = Client( self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.access_token, resource_owner_secret=self.access_token_secret, )
def get_access_token(self): return self.access_token, self.access_token_secret def get_request_token(self): return self._request_token, self._request_token_secret
[docs] def sign_request( self, url: str, http_method: str = "GET", **kwargs ) -> tuple[str, dict, dict]: """ Sign a request with the OAuth client. :param url: The URL to sign. :param http_method: The HTTP method to use. :param kwargs: Additional parameters to pass. :return: The signed URL, headers, and body. """ if not self.access_token: raise USOSAPIException("Access token not set. Did you forget to authorize?") url, headers, body = self._oauth_client.sign( url, http_method=http_method, **kwargs ) return url, headers, body
async def _handle_response_errors(self, response: aiohttp.ClientResponse): """ Handle errors in the response. :param response: The response to handle. :raises USOSAPIException: If an error occurred. """ if response.status != 200: text = await response.text() if response.status == 401: _LOGGER.error( f"HTTP 401: Unauthorized. Your access key probably expired. Response: {text}" ) raise USOSAPIException( "HTTP 401: Unauthorized. Your access key probably expired." ) elif response.status == 400: raise USOSAPIException(f"HTTP 400: Bad request: {text}") else: raise USOSAPIException(f"HTTP {response.status}: {text}") async def _revoke_token(self): """ Revoke the current access token. """ url = f"{self.base_address}{self.REVOKE_TOKEN_SUFFIX}" url, headers, body = self._oauth_client.sign(url, http_method="POST") async with self._session.post(url, data=body, headers=headers) as response: await self._handle_response_errors(response) _LOGGER.info("Token revoked successfully.")
[docs] async def logout(self): """ Log out the user. """ if not self.access_token: return await self._revoke_token() self.access_token = None