aboutsummaryrefslogtreecommitdiff
path: root/netbox/django-vault-client.py
diff options
context:
space:
mode:
Diffstat (limited to 'netbox/django-vault-client.py')
-rw-r--r--netbox/django-vault-client.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/netbox/django-vault-client.py b/netbox/django-vault-client.py
new file mode 100644
index 0000000..e699db3
--- /dev/null
+++ b/netbox/django-vault-client.py
@@ -0,0 +1,84 @@
1import os
2import ssl
3import json
4from urllib import request, parse
5from datetime import datetime, timedelta
6
7
8class Credential:
9
10 def __init__(self, data):
11 self.username = data["data"]["username"]
12 self.password = data["data"]["password"]
13
14 # Leave 2 minutes in case we encounter a failure
15 _duration = timedelta(seconds=data["lease_duration"] - 120)
16 self.expires = datetime.now() + _duration
17
18 @classmethod
19 def empty(cls):
20 return cls({
21 "lease_duration": -1,
22 "data": { "username": "", "password": "" }
23 })
24
25 @property
26 def is_valid(self):
27 return self.expires > datetime.now()
28
29
30
31class SimpleVaultClient:
32
33 def __init__(self, base_url, role_id, role_secret, ssl_verify=True):
34 self.base_url = base_url
35 self.ssl_verify = ssl_verify
36 self.role_id = role_id
37 self.role_secret = role_secret
38
39 env_token = os.getenv("VAULT_TOKEN")
40 if env_token:
41 self._token = env_token
42 self._token_expires = None # Token is assumed to never expire
43 else:
44 self._token = None
45 self._token_expires = datetime.now() + timedelta(seconds=-1)
46
47 def _login_approle(self, role_id, secret):
48 res = self._make_request("auth/approle/login", auth=False, data={
49 "role_id": role_id,
50 "secret_id": secret,
51 })
52
53 self._token = res["auth"]["client_token"]
54 self._token_expires = datetime.now() + \
55 timedelta(seconds=res["auth"]["lease_duration"])
56
57 @property
58 def _token_is_expired(self):
59 return self._token_expires and \
60 self._token_expires < (datetime.now() + timedelta(seconds=120))
61
62 def _auth_as_needed(self):
63 if self._token_is_expired:
64 self._login_approle(self.role_id, self.role_secret)
65
66 def _make_request(self, url, data=None, auth=True):
67 context = ssl._create_unverified_context() \
68 if not self.ssl_verify else None
69
70 data = json.dumps(data).encode("utf-8") if data else None
71 headers = { "X-Vault-Token": self._token } if auth else {}
72
73 url = parse.urljoin(self.base_url, parse.urljoin("/v1/", url))
74 req = request.Request(url, headers=headers, data=data)
75 res = request.urlopen(req, context=context)
76 return json.load(res)
77
78 def get_kv_secret(self, path, key):
79 self._auth_as_needed()
80 return self._make_request(f"kv/data/{path}")["data"]["data"][key]
81
82 def get_db_credential(self, role):
83 self._auth_as_needed()
84 return Credential(self._make_request(f"database/creds/{role}"))