#!/usr/bin/env python import os import sys import math import logging import subprocess from glob import glob from tempfile import mkstemp from datetime import datetime from multiprocessing import Pool from collections import namedtuple from boto.s3.connection import S3Connection from ConfigParser import SafeConfigParser logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(name)s[%(levelname)s]: %(message)s") logger = logging.getLogger("backups") # Boto is very noisy logging.getLogger('boto').setLevel(logging.ERROR) BucketId = namedtuple('BucketId', ('key', 'secret', 'name')) FIVE_MB = 5242880 DATE_FORMAT = "%Y%m%d%H%M%S" UPLOAD_PROCESSES = 8 def get_bucket(bucketid): conn = S3Connection(bucketid.key, bucketid.secret) return conn.get_bucket(bucketid.name) def get_upload(bucket, multipart_id): for mp in bucket.get_all_multipart_uploads(): if mp.id == multipart_id: return mp def _upload_part(bucketid, multipart_id, part_num, source_path, bytes): amount_of_retries = 5 while amount_of_retries > 0: try: mp = get_upload(get_bucket(bucketid), multipart_id) with open(source_path, "r") as fp: fp.seek((part_num - 1) * FIVE_MB) mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes) return True except Exception, exc: amount_of_retries -= 1 logger.warn("Upload part %s of %r failed, %s retries remaining", part_num, source_path, amount_of_retries) return False def upload(bucketid, keyname, source_path): bucket = get_bucket(bucketid) mp = bucket.initiate_multipart_upload(keyname) pool = Pool(processes=UPLOAD_PROCESSES) results = [] i = 0 remaining = os.stat(source_path).st_size total = int(math.ceil(remaining / float(FIVE_MB))) logger.debug("Uploading in %d chunks", total) while remaining > 0: i += 1 bytes = min([FIVE_MB, remaining]) remaining -= bytes results.append(pool.apply_async( _upload_part, [bucketid, mp.id, i, source_path, bytes])) pool.close() pool.join() if all([result.get(1) for result in results]): logger.info("Upload succeeded") mp.complete_upload() bucket.get_key(keyname).set_acl('private') return 0 else: logger.error("Upload failed, removing parts") mp.cancel_upload() return 1 class ConfigParser(SafeConfigParser): def __init__(self, filename): SafeConfigParser.__init__(self) with open(filename, "r") as fp: self.readfp(fp) self.primary_section = self.sections()[0] def __getattr__(self, key): if key == "mark_caches": return self.getboolean(self.primary_section, key) elif key == "keep": return self.getint(self.primary_section, key) else: return self.get(self.primary_section, key) @property def path(self): return self.primary_section @property def bucket_id(self): return BucketId(self.access_key, self.secret_key, self.bucket) def get_file_date(filename): return datetime.strptime(filename.split("-")[-1], "%Y%m%d%H%M%S") def backup_comparator(lhs, rhs): return cmp(get_file_date(rhs.name), get_file_date(lhs.name)) def trim_backups(bucket_id, prefix, max_items=3): items = list(get_bucket(bucket_id).list(prefix)) items.sort(backup_comparator) for item in items[max_items:]: logger.info("Pruning backup %s", item.name) item.delete() def mark_caches(path): cmd = ("find '{path}' -type d -name 'cache' " "-exec bash -c ""'echo " "\"Signature: 8a477f597d28d172789f06886806bc55\" >" " \"{{}}/CACHEDIR.TAG\"' \;") subprocess.call(cmd.format(path=path), shell=True) def tar_encrypt(now, key, path, tempdir="/srv/tmp"): old_umask = os.umask(0200) key_file = mkstemp(dir=tempdir) archive_file = mkstemp(dir=tempdir) logger.debug("Key file %s", key_file[1]) logger.debug("Archive file %s", archive_file[1]) tar_cmd = ("tar", "-c", "-C", path, "--exclude-caches", ".") gpg_cmd = ("gpg", "-c", "-q", "--no-use-agent", "--batch", "--yes", "--s2k-count", "1024", "--cipher-algo", "AES256", "--digest-algo", "SHA512", "--passphrase-file", key_file[1], "-o", archive_file[1]) try: os.write(key_file[0], "{}{}".format(key, now)) tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE) gpg = subprocess.Popen(gpg_cmd, stdin=tar.stdout) gpg.communicate() finally: os.unlink(key_file[1]) os.umask(old_umask) return archive_file[1] def do_backup(cfg_file): cfg = ConfigParser(cfg_file) now = datetime.now().strftime(DATE_FORMAT) logger.info("Starting backup for %s", cfg.primary_section) if cfg.mark_caches: logger.info("Marking caches") mark_caches(cfg.path) logger.info("Creating archive") archive_file = tar_encrypt(now, cfg.encryption_key, cfg.path) logger.info("Finished creating archive") try: logger.info("Uploading archive") upload(cfg.bucket_id, "{}-{}".format(cfg.filename, now), archive_file) logger.info("Finished uploading archive") finally: os.unlink(archive_file) logger.info("Trimming backups") trim_backups(cfg.bucket_id, "{}-".format(cfg.filename), cfg.keep) logger.info("Finished trimming backups") logger.info("Backup for %s finished", cfg.primary_section) def do_all_backups(pattern): for config_file in glob(pattern): do_backup(config_file) if __name__ == "__main__": do_all_backups("/srv/etc/backups/*.conf")