I have 2 services and KeyCloak. Service A should get access token from KeyCloak to make requests to service B.
For that I’ve written Auth subclass to authenticate through KeyCloak:
import typing
from urllib.parse import urljoin
from uuid import UUID
from httpx import Auth, Request, Response, AsyncClient
from jose import JWTError, jwt
from main import settings
ACCESS_TOKEN = ""
PUBLIC_KEY = ""
class IntgrPltfrmAuth(Auth):
def __init__(self):
self.client = AsyncClient(base_url=settings.INTGR_PLTFRM_KEYCLOAK.SERVER_URL, verify=False)
self.client_id = settings.INTGR_PLTFRM_KEYCLOAK.CLIENT_ID
self.client_secret = settings.INTGR_PLTFRM_KEYCLOAK.CLIENT_SECRET
self.realm = settings.INTGR_PLTFRM_KEYCLOAK.REALM
async def async_auth_flow(self, request: Request) -> typing.AsyncGenerator[Request, Response]:
access_token = await self._get_access_token()
request.headers["Authorization"] = f"Bearer {access_token}"
response = yield request
if response.status_code == 401:
access_token = await self._update_token()
request.headers["Authorization"] = f"Bearer {access_token}"
yield request
async def _get_access_token(self):
public_key = await self.get_public_key()
if not self.token_is_valid(token=ACCESS_TOKEN, public_key=public_key):
await self._update_token()
return ACCESS_TOKEN
async def _update_token(self) -> None:
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
}
resp = await self.client.post(
url="/auth/realms/CSIP/protocol/openid-connect/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=data,
)
resp.raise_for_status()
resp = resp.json()
global ACCESS_TOKEN
ACCESS_TOKEN = resp["access_token"]
return ACCESS_TOKEN
async def _update_public_key(self) -> None:
resp = await self.client.get(f"realms/{self.realm}")
resp.raise_for_status()
resp = resp.json()
public_key = resp["public_key"]
global PUBLIC_KEY
PUBLIC_KEY = f"-----BEGIN PUBLIC KEY-----n{public_key}n-----END PUBLIC KEY-----"
async def get_public_key(self) -> str:
if not PUBLIC_KEY:
await self._update_public_key()
return PUBLIC_KEY
def token_is_valid(self, token: str, public_key: str) -> bool:
options = {"verify_signature": True, "verify_exp": True}
try:
jwt.decode(token=token, key=public_key, options=options)
except JWTError:
return False
return True
As you could notice to reuse access token and public key I store them globally. What is common practice in those cases? Store in Redis or something like this? I don’t like very much storing in global.