aboutsummaryrefslogtreecommitdiff
path: root/netbox/django-vault-client.py
blob: e699db3c8f24e37f7e98191d81ca75d942be6b86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import ssl
import json
from urllib import request, parse
from datetime import datetime, timedelta


class Credential:

    def __init__(self, data):
        self.username = data["data"]["username"]
        self.password = data["data"]["password"]

        # Leave 2 minutes in case we encounter a failure
        _duration = timedelta(seconds=data["lease_duration"] - 120)
        self.expires = datetime.now() + _duration

    @classmethod
    def empty(cls):
        return cls({
            "lease_duration": -1,
            "data": { "username": "", "password": "" }
        })

    @property
    def is_valid(self):
        return self.expires > datetime.now()



class SimpleVaultClient:

    def __init__(self, base_url, role_id, role_secret, ssl_verify=True):
        self.base_url = base_url
        self.ssl_verify = ssl_verify
        self.role_id = role_id
        self.role_secret = role_secret

        env_token = os.getenv("VAULT_TOKEN")
        if env_token:
            self._token = env_token
            self._token_expires = None # Token is assumed to never expire
        else:
            self._token = None
            self._token_expires = datetime.now() + timedelta(seconds=-1)

    def _login_approle(self, role_id, secret):
        res = self._make_request("auth/approle/login", auth=False, data={
            "role_id": role_id, 
            "secret_id": secret,
        })

        self._token = res["auth"]["client_token"]
        self._token_expires = datetime.now() + \
                timedelta(seconds=res["auth"]["lease_duration"])

    @property
    def _token_is_expired(self):
        return self._token_expires and \
                self._token_expires < (datetime.now() + timedelta(seconds=120))

    def _auth_as_needed(self):
        if self._token_is_expired:
            self._login_approle(self.role_id, self.role_secret)

    def _make_request(self, url, data=None, auth=True):
        context = ssl._create_unverified_context() \
                if not self.ssl_verify else None

        data = json.dumps(data).encode("utf-8") if data else None
        headers = { "X-Vault-Token": self._token } if auth else {}

        url = parse.urljoin(self.base_url, parse.urljoin("/v1/", url))
        req = request.Request(url, headers=headers, data=data)
        res = request.urlopen(req, context=context)
        return json.load(res)

    def get_kv_secret(self, path, key):
        self._auth_as_needed()
        return self._make_request(f"kv/data/{path}")["data"]["data"][key]

    def get_db_credential(self, role):
        self._auth_as_needed()
        return Credential(self._make_request(f"database/creds/{role}"))