Source code for aioazure.decorator

from .auth import Authenticator
from .interfaces import ProxyDecorator

from typing import Callable

import aiohttp
import asyncio

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from .interfaces import Proxy


[docs]class AsyncOperationDecorator(ProxyDecorator): def __init__(self, proxy: "Proxy"): super().__init__(proxy)
[docs] async def async_polling(self, url, retry_after): # ToDo: Auth can be added by passing Authenticator as param. headers = {"Accept": "application/json", "Content-Type": "application/json"} async with aiohttp.ClientSession(headers=headers, raise_for_status=True) as session: async_operation_status = dict(status="Submitted") while async_operation_status["status"] not in ("Succeeded", "Failed", "Canceled"): await asyncio.sleep(retry_after) async with session.get(url=url) as response: async_operation_status = await response.json() return async_operation_status
async def __call__(self, awaitable_method: Callable, *args, **kwargs): response = await awaitable_method(*args, **kwargs) for url_key in ("Azure-AsyncOperation", "Location"): if url_key in response.headers.keys(): response = response._replace(body=await self.async_polling(response.headers[url_key], response.headers.get("Retry-After", 10)), headers={key: response.headers[key] for key in response.headers.keys() if key not in ("Azure-AsyncOperation", # Remove async headers "Location", "Retry-After")}) return response
[docs]class AuthDecorator(ProxyDecorator): def __init__(self, proxy: "Proxy", auth: Authenticator): super().__init__(proxy) self.auth = auth async def __call__(self, awaitable_method: Callable, *args, **kwargs): kwargs.setdefault("headers", {}).update({"Authorization": await self.auth.get_token()}) return await awaitable_method(*args, **kwargs)
[docs]class PagingDecorator(ProxyDecorator): def __init__(self, proxy: "Proxy"): super().__init__(proxy)
[docs] @staticmethod async def get_next_page(next_link: str): # ToDo: auth can be added by passing Authenticator as param. headers = {"Accept": "application/json", "Content-Type": "application/json"} async with aiohttp.ClientSession(headers=headers, raise_for_status=True) as session: async with session.get(url=next_link) as response: return await response.json()
[docs] async def get_pages(self, response): while "nextLink" in response.keys(): for entry in response['value']: yield entry response = await self.get_next_page(next_link=response["nextLink"]) yield response
async def __call__(self, awaitable_method: Callable, *args, **kwargs): response = await awaitable_method(*args, **kwargs) if "nextLink" in response.body.keys(): response = response._replace(body=[page async for page in self.get_pages(response.body)]) return response
[docs]class ResponseDecorator(ProxyDecorator): def __init__(self, proxy: "Proxy"): super().__init__(PagingDecorator(AsyncOperationDecorator(proxy))) async def __call__(self, awaitable_method: Callable, *args, **kwargs): return await awaitable_method(*args, **kwargs)