1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
import os
import ssl
import json
from urllib import request, parse
from datetime import datetime, timedelta
class Credential:
def __init__(self, data):
self.username = data["data"]["username"]
self.password = data["data"]["password"]
# Leave 2 minutes in case we encounter a failure
_duration = timedelta(seconds=data["lease_duration"] - 120)
self.expires = datetime.now() + _duration
@classmethod
def empty(cls):
return cls({
"lease_duration": -1,
"data": { "username": "", "password": "" }
})
@property
def is_valid(self):
return self.expires > datetime.now()
class SimpleVaultClient:
def __init__(self, base_url, role_id, role_secret, ssl_verify=True):
self.base_url = base_url
self.ssl_verify = ssl_verify
self.role_id = role_id
self.role_secret = role_secret
env_token = os.getenv("VAULT_TOKEN")
if env_token:
self._token = env_token
self._token_expires = None # Token is assumed to never expire
else:
self._token = None
self._token_expires = datetime.now() + timedelta(seconds=-1)
def _login_approle(self, role_id, secret):
res = self._make_request("auth/approle/login", auth=False, data={
"role_id": role_id,
"secret_id": secret,
})
self._token = res["auth"]["client_token"]
self._token_expires = datetime.now() + \
timedelta(seconds=res["auth"]["lease_duration"])
@property
def _token_is_expired(self):
return self._token_expires and \
self._token_expires < (datetime.now() + timedelta(seconds=120))
def _auth_as_needed(self):
if self._token_is_expired:
self._login_approle(self.role_id, self.role_secret)
def _make_request(self, url, data=None, auth=True):
context = ssl._create_unverified_context() \
if not self.ssl_verify else None
data = json.dumps(data).encode("utf-8") if data else None
headers = { "X-Vault-Token": self._token } if auth else {}
url = parse.urljoin(self.base_url, parse.urljoin("/v1/", url))
req = request.Request(url, headers=headers, data=data)
res = request.urlopen(req, context=context)
if res.status != 200:
raise Exception("Failed to fetch credential from vault")
return json.load(res)
def get_kv_secret(self, path, key):
self._auth_as_needed()
return self._make_request(f"kv/data/{path}")["data"]["data"][key]
def get_db_credential(self, role):
self._auth_as_needed()
return Credential(self._make_request(f"database/creds/{role}"))
|