# vim: set filencoding=utf8
"""
Mercurial Repository Management Library
@author: Mike Crute (mcrute@gmail.com)
@organization: SoftGroup Hosting
@date: February 23, 2011
"""
import os.path
import logging
from StringIO import StringIO
from ConfigParser import SafeConfigParser
class Adornments(object):
BASE_CSS = ("padding: 0px 4px;" "font-size: 10px;"
"font-weight: normal;" "border: 1px solid;"
"background-color: #fafafa;"
"border-color: #ffffff #f0f0f0 #f0f0f0 #ffffff;")
CSS_YELLOW = ("background-color: #ffe500;" "color: #222222;"
"border-color: #fff066 #a89e43 #a89e43 #fff066;")
CSS_BROWN = ("background-color: #815555;" "color: #ffffff;"
"border-color: #ff3347 #541118 #541118 #ff3347;")
CSS_GREEN = ("background-color: #11a800;" "color: #ffffff;"
"border-color: #5eff4c #3ea832 #3ea832 #5eff4c;")
CSS_BLUE = ("background-color: #0099ff;" "color: #ffffff;"
"border-color: #4cb8ff #3279a8 #3279a8 #4cb8ff;")
CSS_RED = ("background-color: #ff0000;" "color: #ffffff;"
"border-color: #ff6666 #000000 #000000 #ff6666;")
BASE_HTML = '{{tag}} '.format(BASE_CSS)
LINK_CSS_DARK = 'color: #ffffff; text-decoration: none;'
LINK_CSS_LIGHT = 'color: #000000; text-decoration: none;'
def __init__(self, repo):
self.repo = repo
def __str__(self):
adornments = [self.repo.description]
if self.repo.moved_to:
tag = 'MOVED'
tag = tag.format(link_css=self.LINK_CSS_DARK,
url=self.repo.moved_to)
adornments.insert(0,
self.BASE_HTML.format(css=self.CSS_BLUE, tag=tag))
if self.repo.defunct:
adornments.insert(0, '')
adornments.append('')
if self.repo.private:
adornments.insert(0,
self.BASE_HTML.format(css=self.CSS_RED, tag="PRIVATE"))
if self.repo.upstream:
tag = 'FORK'
tag = tag.format(link_css=self.LINK_CSS_LIGHT,
url=self.repo.upstream)
adornments.insert(0,
self.BASE_HTML.format(css=self.CSS_YELLOW, tag=tag))
if self.repo.maintained:
adornments.insert(0,
self.BASE_HTML.format(css=self.CSS_GREEN, tag="MAINTAINED"))
return " ".join(adornments)
class Repository(object):
def __init__(self, path, description=None, contact=None):
self.path = path
self.description = description
self.contact = contact
self.readers = []
self.writers = []
self.defunct = False
self.maintained = True
self.private = False
self.moved_to = None
self.repo_path = None
self.upstream = None
self.lock_script = None
def can_be_read_by(self, user):
return user in self.readers
def can_be_written_by(self, user):
return user in self.writers
@property
def exists(self):
return os.path.exists(self.full_path)
@property
def full_path(self):
return os.path.join(self.repo_path, self.path)
@property
def hgrc_path(self):
return os.path.join(self.full_path, '.hg', 'hgrc')
@classmethod
def from_config(cls, config, repo_path, lock_script=None):
self = cls(config['path'], config.get('description', None),
config.get('contact', None))
self.readers = ConfigLoader.as_list(config.get('read', ''))
self.writers = ConfigLoader.as_list(config.get('write', ''))
self.defunct = ConfigLoader.as_bool(config.get('defunct', 'no'))
self.private = ConfigLoader.as_bool(config.get('private', 'no'))
self.maintained = ConfigLoader.as_bool(config.get('maintained', 'no'))
self.moved_to = config.get('moved_to', None)
self.upstream = config.get('upstream', None)
self.repo_path = repo_path
self.lock_script = lock_script
return self
def build_hgrc(self, users):
buf = StringIO()
buf.write("# This file is generated by sync-repo-config\n")
buf.write("# Changes are over-written on each run\n\n")
if self.contact or self.description or self.writers:
buf.write("[web]\n")
if self.contact:
buf.write("contact = {0}\n".format(users[self.contact]))
if self.description:
buf.write("description = {adorned_name}\n".format(
adorned_name=Adornments(self)))
if self.writers:
buf.write("allow_push = {0}\n".format(",".join(self.writers)))
buf.write("\n")
if self.lock_script:
buf.write("[hooks]\n")
buf.write("prechangegroup.verify-permission = {0}\n".format(
self.lock_script))
buf.write("\n")
buf.write("[ssh]\n")
buf.write("readers = {0}\n".format(",".join(self.readers)))
buf.write("writers = {0}\n".format(",".join(self.writers)))
return buf.getvalue()
def write_hgrc(self, users):
if not self.exists:
raise IOError("Repository {0!r} does not exist".format(
self.full_path))
with open(self.hgrc_path, 'w') as hgrc:
hgrc.write(self.build_hgrc(users))
def load_hgrc(self):
hgrc = SafeConfigParser()
with open(self.hgrc_path, 'r') as fp:
hgrc.readfp(fp)
return hgrc
def load_from_hgrc(self):
hgrc = self.load_hgrc()
self.description = hgrc.get("web", "description")
self.contact = hgrc.get("web", "contact")
self.readers = ConfigLoader.as_list(hgrc.get("ssh", "readers"))
self.writers = ConfigLoader.as_list(hgrc.get("ssh", "writers"))
class User(object):
def __init__(self, username, name, email):
self.username = username
self.name = name
self.email = email
self.ssh_key = None
self.login_script = None
def __str__(self):
return "{self.name} <{self.email}>".format(self=self)
@classmethod
def from_config(cls, config, login_script):
self = cls(config['username'], config['name'], config['email'])
self.ssh_key = config.get('ssh_key', None)
self.login_script = login_script
return self
@property
def ssh_line(self):
if not self.ssh_key:
return ""
return ('command="{self.login_script} {self.username}",'
'no-port-forwarding,no-X11-forwarding,no-agent-forwarding'
' {self.ssh_key} {self.email}\n').format(self=self)
class ConfigLoader(object):
def __init__(self, cfg_file):
self.config = SafeConfigParser()
with open(cfg_file, 'r') as config_file:
self.config.readfp(config_file)
self.login_script = self.config.get('system', 'login_script')
self.repo_path = self.config.get('system', 'repo_path')
self.repo_user = self.config.get('system', 'repo_user')
self.lock_script = self.config.get('system', 'lock_script')
def _filtered_sections(self, section_name):
for section in self.config.sections():
if section.startswith(section_name + ":"):
yield section
@staticmethod
def as_bool(value):
return value.lower() in ('yes', 'true', 'on')
@staticmethod
def as_list(value):
return [item.strip() for item in value.split(",")
if value]
@staticmethod
def clean_section_name(name):
return name.split(':')[1]
@property
def repos(self):
for section in self._filtered_sections('repo'):
values = dict(self.config.items(section))
values['path'] = self.clean_section_name(section)
yield Repository.from_config(values, self.repo_path,
self.lock_script)
@property
def users(self):
for section in self._filtered_sections('user'):
values = dict(self.config.items(section))
values['username'] = self.clean_section_name(section)
yield User.from_config(values, self.login_script)
@property
def user_dict(self):
return dict((user.username, user) for user in self.users)
@property
def repo_user_authorized_keys(self):
home_dir = os.path.expanduser('~' + self.repo_user)
if '~' in home_dir:
raise ValueError("User {0!r} doesn't exist".format(self.repo_user))
return os.path.join(home_dir, '.ssh', 'authorized_keys')
def get_logger(name):
format = "%(levelname)s: %(message)s"
logging.basicConfig(level=logging.DEBUG, format=format,
filename="/srv/hg/scripts.log")
stream = logging.StreamHandler()
stream.setLevel(logging.WARNING)
stream.setFormatter(logging.Formatter(format))
logger = logging.getLogger(name)
logger.addHandler(stream)
return logger