import os import ssl import json from urllib import request, parse from datetime import datetime, timedelta class Credential: def __init__(self, data): self.username = data["data"]["username"] self.password = data["data"]["password"] # Leave 2 minutes in case we encounter a failure _duration = timedelta(seconds=data["lease_duration"] - 120) self.expires = datetime.now() + _duration @classmethod def empty(cls): return cls({ "lease_duration": -1, "data": { "username": "", "password": "" } }) @property def is_valid(self): return self.expires > datetime.now() class SimpleVaultClient: def __init__(self, base_url, role_id, role_secret, ssl_verify=True): self.base_url = base_url self.ssl_verify = ssl_verify self.role_id = role_id self.role_secret = role_secret env_token = os.getenv("VAULT_TOKEN") if env_token: self._token = env_token self._token_expires = None # Token is assumed to never expire else: self._token = None self._token_expires = datetime.now() + timedelta(seconds=-1) def _login_approle(self, role_id, secret): res = self._make_request("auth/approle/login", auth=False, data={ "role_id": role_id, "secret_id": secret, }) self._token = res["auth"]["client_token"] self._token_expires = datetime.now() + \ timedelta(seconds=res["auth"]["lease_duration"]) @property def _token_is_expired(self): return self._token_expires and \ self._token_expires < (datetime.now() + timedelta(seconds=120)) def _auth_as_needed(self): if self._token_is_expired: self._login_approle(self.role_id, self.role_secret) def _make_request(self, url, data=None, auth=True): context = ssl._create_unverified_context() \ if not self.ssl_verify else None data = json.dumps(data).encode("utf-8") if data else None headers = { "X-Vault-Token": self._token } if auth else {} url = parse.urljoin(self.base_url, parse.urljoin("/v1/", url)) req = request.Request(url, headers=headers, data=data) res = request.urlopen(req, context=context) return json.load(res) def get_kv_secret(self, path, key): self._auth_as_needed() return self._make_request(f"kv/data/{path}")["data"]["data"][key] def get_db_credential(self, role): self._auth_as_needed() return Credential(self._make_request(f"database/creds/{role}"))