From 763b810ca1a9d755205e49b1246025b83abb5132 Mon Sep 17 00:00:00 2001 From: Mike Crute Date: Wed, 27 Jan 2021 04:41:00 +0000 Subject: Add netbox --- netbox/django-vault-client.py | 84 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 netbox/django-vault-client.py (limited to 'netbox/django-vault-client.py') diff --git a/netbox/django-vault-client.py b/netbox/django-vault-client.py new file mode 100644 index 0000000..e699db3 --- /dev/null +++ b/netbox/django-vault-client.py @@ -0,0 +1,84 @@ +import os +import ssl +import json +from urllib import request, parse +from datetime import datetime, timedelta + + +class Credential: + + def __init__(self, data): + self.username = data["data"]["username"] + self.password = data["data"]["password"] + + # Leave 2 minutes in case we encounter a failure + _duration = timedelta(seconds=data["lease_duration"] - 120) + self.expires = datetime.now() + _duration + + @classmethod + def empty(cls): + return cls({ + "lease_duration": -1, + "data": { "username": "", "password": "" } + }) + + @property + def is_valid(self): + return self.expires > datetime.now() + + + +class SimpleVaultClient: + + def __init__(self, base_url, role_id, role_secret, ssl_verify=True): + self.base_url = base_url + self.ssl_verify = ssl_verify + self.role_id = role_id + self.role_secret = role_secret + + env_token = os.getenv("VAULT_TOKEN") + if env_token: + self._token = env_token + self._token_expires = None # Token is assumed to never expire + else: + self._token = None + self._token_expires = datetime.now() + timedelta(seconds=-1) + + def _login_approle(self, role_id, secret): + res = self._make_request("auth/approle/login", auth=False, data={ + "role_id": role_id, + "secret_id": secret, + }) + + self._token = res["auth"]["client_token"] + self._token_expires = datetime.now() + \ + timedelta(seconds=res["auth"]["lease_duration"]) + + @property + def _token_is_expired(self): + return self._token_expires and \ + self._token_expires < (datetime.now() + timedelta(seconds=120)) + + def _auth_as_needed(self): + if self._token_is_expired: + self._login_approle(self.role_id, self.role_secret) + + def _make_request(self, url, data=None, auth=True): + context = ssl._create_unverified_context() \ + if not self.ssl_verify else None + + data = json.dumps(data).encode("utf-8") if data else None + headers = { "X-Vault-Token": self._token } if auth else {} + + url = parse.urljoin(self.base_url, parse.urljoin("/v1/", url)) + req = request.Request(url, headers=headers, data=data) + res = request.urlopen(req, context=context) + return json.load(res) + + def get_kv_secret(self, path, key): + self._auth_as_needed() + return self._make_request(f"kv/data/{path}")["data"]["data"][key] + + def get_db_credential(self, role): + self._auth_as_needed() + return Credential(self._make_request(f"database/creds/{role}")) -- cgit v1.2.3