From 4f7c446b853e333caaf1b28b668ffbc1b011b78f Mon Sep 17 00:00:00 2001 From: Mike Crute Date: Thu, 4 Feb 2010 23:26:32 -0500 Subject: Adding SQLite storage backend. --- kronos/model.py | 17 ++++ kronos/parser.py | 8 +- kronos/storage.py | 177 +++++++++++++++++++++++++++++++++++ kronos/tests/itest_sqlite_storage.py | 66 +++++++++++++ 4 files changed, 261 insertions(+), 7 deletions(-) create mode 100644 kronos/model.py create mode 100644 kronos/storage.py create mode 100644 kronos/tests/itest_sqlite_storage.py diff --git a/kronos/model.py b/kronos/model.py new file mode 100644 index 0000000..d2ac67c --- /dev/null +++ b/kronos/model.py @@ -0,0 +1,17 @@ +# vim: set filencoding=utf8 +""" +Kronos Model Objects + +@author: Mike Crute (mcrute@ag.com) +@organization: American Greetings Interactive +@date: February 04, 2010 +""" + + +class Activity(object): + + def __init__(self, activity, description=None, category=None): + self.activity = activity + self.description = description + self.category = category + self.tags = [] diff --git a/kronos/parser.py b/kronos/parser.py index f0543f9..147682f 100644 --- a/kronos/parser.py +++ b/kronos/parser.py @@ -7,13 +7,7 @@ Activity Parser @date: February 04, 2010 """ -class Activity(object): - - def __init__(self, activity, description=None, category=None): - self.activity = activity - self.description = description - self.category = category - self.tags = [] +from kronos.model import Activity def parse_activity(text): diff --git a/kronos/storage.py b/kronos/storage.py new file mode 100644 index 0000000..4add988 --- /dev/null +++ b/kronos/storage.py @@ -0,0 +1,177 @@ +# vim: set filencoding=utf8 +""" +Storage Back-ends For Kronos + +@author: Mike Crute (mcrute@ag.com) +@organization: American Greetings Interactive +@date: February 04, 2010 +""" + + +class StorageError(Exception): + """ + Base class for all other storage exceptions. + """ + + +class NotConnected(StorageError): + """ + Raised when a connection to the back-end is not avaiable. + """ + + +def _get_filtered_object_dict(dict_): + output = {} + for key, value in dict_.__dict__.items(): + if not key.startswith('_') and not callable(value): + output[key] = value + + return output + + +class BaseStorageBackEnd(object): + + def __init__(self): + self.load_engine() + + def get(self, model_obj, **kwargs): + """ + Get a model object instance from the storage back-end + where the criteria in the kwargs matches that in the + back-end. + """ + name = model_obj.__name__ + result = self.select(name, **kwargs) + + instance = model_obj() + for key, value in result.items(): + if key != 'id': + setattr(instance, key, value) + else: + instance.__db_id__ = value + + return instance + + def save(self, model_obj): + """ + Persist a model object with the storage back-end. + """ + name = model_obj.__class__.__name__ + id_ = getattr(model_obj, '__db_id__', None) + + values = _get_filtered_object_dict(model_obj) + + if id_: + self.update(name, id_, **values) + else: + self.insert(name, **values) + + def load_engine(self): + """ + Load a storage engine module that contains a function + called connect suitable for setting up the actual + connection. + """ + + def select(self, table, **kwargs): + """ + Select raw data from the storage engine and return + as a dictionary. + """ + + def insert(self, table, **values): + """ + Insert values into the back-end named table or + equivalent data structure. + """ + + def update(self, table, criteria, **values): + """ + Update values in the back-end table or equivalent + data structure based on the criteria. + """ + + def connect(self, database, *args, **kwargs): + """ + Create a connection to the back-end. + """ + + def create_table(self, obj, **columns): + """ + Create a table or equivalent data structure in the + storage back-end. + """ + + +class SQLiteBackEnd(BaseStorageBackEnd): + + def __init__(self): + super(SQLiteBackEnd, self).__init__() + self.connection = None + + def load_engine(self): + import sqlite3 + self.engine = sqlite3 + + def _check_connection(self): + if not self.connection: + raise NotConnected("Not connected to storage back-end!") + + def connect(self, database, *args, **kwargs): + self.connection = self.engine.connect(database) + + def select(self, table, **kwargs): + sql = "SELECT * FROM {0} WHERE ".format(table) + for key in kwargs.keys(): + sql += "{0}=? ".format(key) + + return self._get_normalized_results(sql, **kwargs)[0] + + def _get_normalized_results(self, sql, **kwargs): + self._check_connection() + cursor = self.connection.cursor() + + results = cursor.execute(sql, kwargs.values()) + return self._normalize_results(results) + + def _normalize_results(self, results): + col_names = [col[0] for col in results.description] + output = [] + for row in results.fetchall(): + output.append(dict(zip(col_names, row))) + + return output + + def insert(self, table, **values): + placeholder = ("?," * len(values)).rstrip(',') + col_spec = ','.join(values.keys()) + sql = "INSERT INTO {0}({1}) VALUES({2})".format(table, col_spec, + placeholder) + + self._execute_modification(sql, values.values()) + + def update(self, table, criteria, **values): + sql = "UPDATE {0} SET ".format(table) + for key in values.keys(): + sql += "{0}=?, ".format(key) + sql = sql[:-2] + "WHERE id=?" + + binds = values.values() + [criteria] + self._execute_modification(sql, binds) + + def create_table(self, obj, **columns): + table = obj.__name__ + + sql = "CREATE TABLE {0} (".format(table) + sql += "id INTEGER PRIMARY KEY" + for key, value in columns.items(): + sql += ", {0} {1}".format(key, value) + sql += ")" + + self._execute_modification(sql) + + def _execute_modification(self, sql, binds=()): + self._check_connection() + cursor = self.connection.cursor() + cursor.execute(sql, binds) + self.connection.commit() diff --git a/kronos/tests/itest_sqlite_storage.py b/kronos/tests/itest_sqlite_storage.py new file mode 100644 index 0000000..4628a23 --- /dev/null +++ b/kronos/tests/itest_sqlite_storage.py @@ -0,0 +1,66 @@ +# vim: set filencoding=utf8 +""" +SQLite Storage Back-End Test + +@author: Mike Crute (mcrute@ag.com) +@organization: American Greetings Interactive +@date: February 04, 2010 + +A basic integration test that exercies the entire +SQLite storage back-end. +""" + + +import unittest +from nose.tools import assert_equals, assert_raises +from kronos.storage import SQLiteBackEnd, NotConnected + + +class SampleModel(object): + + foo = None + bar = None + baz = None + + +class TestSQLitBackEnd(unittest.TestCase): + + def setUp(self): + self.storage = SQLiteBackEnd() + self.storage.connect(':memory:') + + self.storage.create_table(SampleModel, foo='text', + bar='text', baz='text') + + self.model1 = SampleModel() + self.model1.foo = '123' + self.model1.bar = '456' + self.model1.baz = '789' + + def test_no_connect_should_cause_error(self): + self.storage.connection = None + assert_raises(NotConnected, self.storage.save, self.model1) + + def test_save_and_select(self): + self.storage.save(self.model1) + results = self.storage.get(SampleModel, foo='123') + + assert isinstance(results, SampleModel) + assert_equals(results.foo, '123') + assert_equals(results.bar, '456') + assert_equals(results.baz, '789') + + def test_save_and_update(self): + self.storage.save(self.model1) + results = self.storage.get(SampleModel, foo='123') + results.foo = 'test' + self.storage.save(results) + + assert isinstance(results, SampleModel) + assert_equals(results.foo, 'test') + assert_equals(results.bar, '456') + assert_equals(results.baz, '789') + + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3