From f1f41f9c34de2d5909364e3702a6eebee47d6ecb Mon Sep 17 00:00:00 2001 From: Ian Dalton Date: Fri, 15 Jun 2012 17:52:21 -0600 Subject: [PATCH 1/3] Fix some non-functional code - Some variables were being wrongly addressed as global instead of members of self. - One line had the wrong indentation, so a variable was only defined on an irrelevant condition. - Some methods were being called on a connection object, instead of on the result returned by the object's execute method. --- .gitignore | 1 + pygrametl/tables.py | 30 +++++++++++++++--------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 94487b9..a15d4f8 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +/build/ diff --git a/pygrametl/tables.py b/pygrametl/tables.py index 699f002..6e5fb2d 100644 --- a/pygrametl/tables.py +++ b/pygrametl/tables.py @@ -125,27 +125,27 @@ def _init_sql(self): # This gives "SELECT key FROM name WHERE lookupval1 = %(lookupval1)s # AND lookupval2 = %(lookupval2)s AND ..." - self.keylookupsql = "SELECT " + key + " FROM " + name + " WHERE " + \ + self.keylookupsql = "SELECT " + self.key + " FROM " + self.name + " WHERE " + \ " AND ".join(["%s = %%(%s)s" % (lv, lv) for lv in self.lookupatts]) # This gives "SELECT key, att1, att2, ... FROM NAME WHERE key = %(key)s" self.rowlookupsql = "SELECT " + ", ".join(self.all) + \ - " FROM %s WHERE %s = %%(%s)s" % (name, key, key) + " FROM %s WHERE %s = %%(%s)s" % (self.name, self.key, self.key) # This gives "INSERT INTO name(key, att1, att2, ...) # VALUES (%(key)s, %(att1)s, %(att2)s, ...)" - self.insertsql = "INSERT INTO " + name + "(%s" % (key,) + \ - (attributes and ", " or "") + \ - ", ".join(attributes) + ") VALUES (" + \ + self.insertsql = "INSERT INTO " + self.name + "(%s" % (self.key,) + \ + (self.attributes and ", " or "") + \ + ", ".join(self.attributes) + ") VALUES (" + \ ", ".join(["%%(%s)s" % (att,) for att in self.all]) + ")" def _get_idfinder(self): - self.targetconnection.execute("SELECT MAX(%s) FROM %s" % \ + result = self.targetconnection.execute("SELECT MAX(%s) FROM %s" % \ (self.key, self.name)) - #self._maxid = self.targetconnection.fetchonetuple()[0] - self._maxid = self.targetconnection.fetchone()[0] + #self._maxid = result.fetchonetuple()[0] + self._maxid = result.fetchone()[0] if self._maxid is None: self._maxid = 0 return self._getnextid @@ -166,13 +166,13 @@ def lookup(self, row, namemapping=None): if namemapping and row: row = pygrametl.copy(row, **namemapping) - sql = self.keylookupsql % row - #self.targetconnection.execute(self.keylookupsql, row) - self.targetconnection.execute(sql) + sql = self.keylookupsql % row + #result = self.targetconnection.execute(self.keylookupsql, row) + result = self.targetconnection.execute(sql) #, namemapping) - #keyvalue = self.targetconnection.fetchonetuple()[0] - keyvalue = self.targetconnection.fetchone()[0] + #keyvalue = result.fetchonetuple()[0] + keyvalue = result.fetchone()[0] if keyvalue is None: keyvalue = self.defaultidvalue # most likely also None... @@ -1378,8 +1378,8 @@ def _init_sql(self): # INSERT INTO name (key1, ..., keyn, meas1, ..., measn) # VALUES (%(key1)s, ..., %(keyn)s, %(meas1)s, ..., %(measn)s) self.insertsql = "INSERT INTO " + self.name + "(" + \ - ", ".join(keyrefs) + (measures and ", " or "") + \ - ", ".join(measures) + ") VALUES (" + \ + ", ".join(self.keyrefs) + (self.measures and ", " or "") + \ + ", ".join(self.measures) + ") VALUES (" + \ ", ".join(["%%(%s)s" % (att,) for att in self.all]) + ")" # SELECT key1, ..., keyn, meas1, ..., measn FROM name From ad1646c8dafb6595090ba15391a708a56da17b80 Mon Sep 17 00:00:00 2001 From: Ian Dalton Date: Fri, 15 Jun 2012 17:53:27 -0600 Subject: [PATCH 2/3] Merge CachedDimension into Dimension Instead of having a separate class for CachedDimension, it is better that Dimension simply have extra flags for caching (enabled by default). --- pygrametl/tables.py | 314 ++++++++++++++++++-------------------------- 1 file changed, 129 insertions(+), 185 deletions(-) diff --git a/pygrametl/tables.py b/pygrametl/tables.py index 6e5fb2d..4614035 100644 --- a/pygrametl/tables.py +++ b/pygrametl/tables.py @@ -54,16 +54,23 @@ __author__ = "Christian Thomsen" __maintainer__ = "Christian Thomsen" __version__ = '0.2.0.3' -__all__ = ['Dimension', 'CachedDimension', 'SlowlyChangingDimension', +__all__ = ['Dimension', 'SlowlyChangingDimension', 'SnowflakedDimension', 'FactTable', 'BatchFactTable', 'BulkFactTable', 'SubprocessFactTable'] class Dimension(object): - """A class for accessing a dimension. Does no caching.""" + """A class for accessing a dimension. Does optional caching. + + When caching, we assume that the DB doesn't change or add any + attribute values that are cached. For example, a DEFAULT value in + the DB can break this assumption. + """ def __init__(self, name, key, attributes, lookupatts=(), idfinder=None, defaultidvalue=None, rowexpander=None, - targetconnection=None): + targetconnection=None, caching=True, + size=10000, prefill=False, cachefullrows=False, + cacheoninsert=True): """Arguments: - name: the name of the dimension table in the DW - key: the name of the primary key in the DW @@ -92,6 +99,16 @@ def __init__(self, name, key, attributes, lookupatts=(), done. - targetconnection: The ConnectionWrapper to use. If not given, the default target connection is used. + - caching: Whether to use caching. Default: True + - size: the maximum number of rows to cache. If less than or equal + to 0, unlimited caching is used. Default: 10000 + - prefill: a flag deciding if the cache should be filled when + initialized. Default: False + - cachefullrows: a flag deciding if full rows should be + cached. If not, the cache only holds a mapping from + lookupattributes to key values. Default: False. + - cacheoninsert: a flag deciding if the cache should be updated + when insertions are done. Default: True """ if not type(key) in types.StringTypes: raise ValueError, "Key argument must be a string" @@ -120,6 +137,50 @@ def __init__(self, name, key, attributes, lookupatts=(), else: self.idfinder = self._get_idfinder() + ## Caching + self.caching = caching + if caching: + self.cacheoninsert = cacheoninsert + self.__prefill = prefill + self.__size = size + if size > 0: + if cachefullrows: + self.__key2row = FIFODict(size) + self.__vals2key = FIFODict(size) + else: + # Use dictionaries as unlimited caches + if cachefullrows: + self.__key2row = {} + self.__vals2key = {} + + self.cachefullrows = cachefullrows + + if prefill: + if cachefullrows: + positions = tuple([self.all.index(att) \ + for att in self.lookupatts]) + # select the key and all attributes + sql = "SELECT %s FROM %s" % (", ".join(self.all), name) + else: + # select the key and the lookup attributes + sql = "SELECT %s FROM %s" % \ + (", ".join([key] + [l for l in self.lookupatts]), name) + positions = range(1, len(self.lookupatts) + 1) + + result = self.targetconnection.execute(sql) + if size <= 0: + #data = self.targetconnection.fetchalltuples() + data = result.fetchall() + else: + #data = self.targetconnection.fetchmanytuples(size) + data = result.fetchmany(size) + + for rawrow in data: + if cachefullrows: + self.__key2row[rawrow[0]] = rawrow + t = tuple([rawrow[i] for i in positions]) + self.__vals2key[t] = rawrow[0] + def _init_sql(self): # Now create the SQL that we will need... @@ -159,6 +220,18 @@ def lookup(self, row, namemapping=None): - row: a dict which must contain at least the lookup attributes - namemapping: an optional namemapping (see module's documentation) """ + if self.caching: + res = self._before_lookup(row, namemapping) + if res is not None: + return res + if self.__prefill and self.cacheoninsert and \ + (self.__size <= 0 or len(self.__vals2key) < self.__size): + # Everything is cached. We don't have to look in the DB + return self.defaultidvalue + # Something is not cached so we have to use the classical lookup + return self.non_cached_lookup(row, namemapping) + + def non_cached_lookup(self, row, namemapping=None): namemapping = namemapping or {} key = self._before_lookup(row, namemapping) if key is not None: @@ -181,10 +254,17 @@ def lookup(self, row, namemapping=None): def _before_lookup(self, row, namemapping): - return None + if self.caching: + namesinrow = [(namemapping.get(a) or a) if namemapping else a + for a in self.lookupatts] + searchtuple = tuple([row[n] for n in namesinrow]) + return self.__vals2key.get(searchtuple, None) def _after_lookup(self, row, namemapping, resultkeyvalue): - pass + if self.caching and resultkeyvalue is not None: + namesinrow = [(namemapping.get(a) or a) for a in self.lookupatts] + searchtuple = tuple([row[n] for n in namesinrow]) + self.__vals2key[searchtuple] = resultkeyvalue def getbykey(self, keyvalue): """Lookup and return the row with the given key value. @@ -204,10 +284,17 @@ def getbykey(self, keyvalue): return row def _before_getbykey(self, keyvalue): + if self.caching and self.cachefullrows: + res = self.__key2row.get(keyvalue) + if res is not None: + return dict(zip(self.all, res)) return None def _after_getbykey(self, keyvalue, resultrow): - pass + if self.caching and self.cachefullrows \ + and resultrow[self.key] is not None: + # if resultrow[self.key] is None, no result was found in the db + self.__key2row[keyvalue] = tuple([resultrow[a] for a in self.all]) def getbyvals(self, values, namemapping={}): """Return a list of all rows with values identical to the given. @@ -273,7 +360,30 @@ def update(self, row, namemapping={}): self._after_update(row, namemapping) def _before_update(self, row, namemapping): - return None + if self.caching: + # We have to remove old values from the caches. + key = (namemapping.get(self.key) or self.key) + for att in self.lookupatts: + if ((att in namemapping and namemapping[att] in row) + or att in row): + # A lookup attribute is about to be changed and we should + # make sure that the cache does not map from the old value. + # Here, we can only see the new value, but we can get the + # old lookup values by means of the key: + oldrow = self.getbykey(row[key]) + namesinrow = [(namemapping.get(a) or a) + for a in self.lookupatts] + searchtuple = tuple([oldrow[n] for n in namesinrow]) + if searchtuple in self.__vals2key: + del self.__vals2key[searchtuple] + break + + + if self.cachefullrows: + if row[key] in self.__key2row: + # The cached row is now incorrect. We must make sure it is + # not in the cache. + del self.__key2row[row[key]] def _after_update(self, row, namemapping): pass @@ -332,7 +442,17 @@ def _before_insert(self, row, namemapping): return None def _after_insert(self, row, namemapping, newkeyvalue): - pass + # After the insert, we can look the row up. Pretend that we + # did that. Then we get the new data cached. + # NB: Here we assume that the DB doesn't change or add anything. + # For example, a DEFAULT value in the DB breaks this assumption. + if self.caching and self.cacheoninsert: + self._after_lookup(row, namemapping, newkeyvalue) + # import pdb; pdb.set_trace() + if self.cachefullrows: + tmp = pygrametl.project(self.attributes, row, namemapping) + tmp[self.key] = newkeyvalue + self._after_getbykey(newkeyvalue, tmp) @@ -381,7 +501,7 @@ def _get_idfinder(self): return self._getnextid - def lookup(self, row, namemapping=None): + def non_cached_lookup(self, row, namemapping=None): namemapping = namemapping or {} mapping = pygrametl.copy(row, **namemapping) select = sa.select([self.sa_table.c[self.key]], @@ -406,182 +526,6 @@ def _insert(self, row, namemapping): -class CachedDimension(Dimension): - """A class for accessing a dimension. Does caching. - - We assume that the DB doesn't change or add any attribute - values that are cached. - For example, a DEFAULT value in the DB can break this assumption. - """ - - def __init__(self, name, key, attributes, lookupatts=(), - idfinder=None, defaultidvalue=None, rowexpander=None, - size=10000, prefill=False, cachefullrows=False, - cacheoninsert=True, targetconnection=None): - """Arguments: - - name: the name of the dimension table in the DW - - key: the name of the primary key in the DW - - attributes: a sequence of the attribute names in the dimension - table. Should not include the name of the primary key which is - given in the key argument. - - lookupatts: A subset of the attributes that uniquely identify - a dimension members. These attributes are thus used for looking - up members. If not given, it is assumed that - lookupatts = attributes - - idfinder: A function(row, namemapping) -> key value that assigns - a value to the primary key attribute based on the content of the - row and namemapping. If not given, it is assumed that the primary - key is an integer, and the assigned key value is then the current - maximum plus one. - - defaultidvalue: An optional value to return when a lookup fails. - This should thus be the ID for a preloaded "Unknown" member. - - rowexpander: A function(row, namemapping) -> row. This function - is called by ensure before insertion if a lookup of the row fails. - This is practical if expensive calculations only have to be done - for rows that are not already present. For example, for a date - dimension where the full date is used for looking up rows, a - rowexpander can be set such that week day, week number, season, - year, etc. are only calculated for dates that are not already - represented. If not given, no automatic expansion of rows is - done. - - size: the maximum number of rows to cache. If less than or equal - to 0, unlimited caching is used. Default: 10000 - - prefill: a flag deciding if the cache should be filled when - initialized. Default: False - - cachefullrows: a flag deciding if full rows should be - cached. If not, the cache only holds a mapping from - lookupattributes to key values. Default: False. - - cacheoninsert: a flag deciding if the cache should be updated - when insertions are done. Default: True - - targetconnection: The ConnectionWrapper to use. If not given, - the default target connection is used. - """ - - Dimension.__init__(self, name, key, attributes, lookupatts, idfinder, - defaultidvalue, rowexpander, targetconnection) - self.cacheoninsert = cacheoninsert - self.__prefill = prefill - self.__size = size - if size > 0: - if cachefullrows: - self.__key2row = FIFODict(size) - self.__vals2key = FIFODict(size) - else: - # Use dictionaries as unlimited caches - if cachefullrows: - self.__key2row = {} - self.__vals2key = {} - - self.cachefullrows = cachefullrows - - if prefill: - if cachefullrows: - positions = tuple([self.all.index(att) \ - for att in self.lookupatts]) - # select the key and all attributes - sql = "SELECT %s FROM %s" % (", ".join(self.all), name) - else: - # select the key and the lookup attributes - sql = "SELECT %s FROM %s" % \ - (", ".join([key] + [l for l in self.lookupatts]), name) - positions = range(1, len(self.lookupatts) + 1) - - self.targetconnection.execute(sql) - if size <= 0: - #data = self.targetconnection.fetchalltuples() - data = self.targetconnection.fetchall() - else: - #data = self.targetconnection.fetchmanytuples(size) - data = self.targetconnection.fetchmany(size) - - for rawrow in data: - if cachefullrows: - self.__key2row[rawrow[0]] = rawrow - t = tuple([rawrow[i] for i in positions]) - self.__vals2key[t] = rawrow[0] - - def lookup(self, row, namemapping={}): - if self.__prefill and self.cacheoninsert and \ - (self.__size <= 0 or len(self.__vals2key) < self.__size): - # Everything is cached. We don't have to look in the DB - res = self._before_lookup(row, namemapping) - if res is not None: - return res - else: - return self.defaultidvalue - else: - # Something is not cached so we have to use the classical lookup - return Dimension.lookup(self, row, namemapping) - - def _before_lookup(self, row, namemapping): - namesinrow =[(namemapping.get(a) or a) for a in self.lookupatts] - searchtuple = tuple([row[n] for n in namesinrow]) - return self.__vals2key.get(searchtuple, None) - - def _after_lookup(self, row, namemapping, resultkey): - if resultkey is not None: - namesinrow =[(namemapping.get(a) or a) for a in self.lookupatts] - searchtuple = tuple([row[n] for n in namesinrow]) - self.__vals2key[searchtuple] = resultkey - - def _before_getbykey(self, keyvalue): - if self.cachefullrows: - res = self.__key2row.get(keyvalue) - if res is not None: - return dict(zip(self.all, res)) - return None - - def _after_getbykey(self, keyvalue, resultrow): - if self.cachefullrows and resultrow[self.key] is not None: - # if resultrow[self.key] is None, no result was found in the db - self.__key2row[keyvalue] = tuple([resultrow[a] for a in self.all]) - - def _before_update(self, row, namemapping): - """ """ - # We have to remove old values from the caches. - key = (namemapping.get(self.key) or self.key) - for att in self.lookupatts: - if ((att in namemapping and namemapping[att] in row) or att in row): - # A lookup attribute is about to be changed and we should make - # sure that the cache does not map from the old value. Here, - # we can only see the new value, but we can get the old lookup - # values by means of the key: - oldrow = self.getbykey(row[key]) - namesinrow =[(namemapping.get(a) or a) for a in self.lookupatts] - searchtuple = tuple([oldrow[n] for n in namesinrow]) - if searchtuple in self.__vals2key: - del self.__vals2key[searchtuple] - break - - - if self.cachefullrows: - if row[key] in self.__key2row: - # The cached row is now incorrect. We must make sure it is - # not in the cache. - del self.__key2row[row[key]] - - return None - - def _after_insert(self, row, namemapping, newkeyvalue): - """ """ - # After the insert, we can look the row up. Pretend that we - # did that. Then we get the new data cached. - # NB: Here we assume that the DB doesn't change or add anything. - # For example, a DEFAULT value in the DB breaks this assumption. - if self.cacheoninsert: - self._after_lookup(row, namemapping, newkeyvalue) - if self.cachefullrows: - tmp = pygrametl.project(self.all, row, namemapping) - tmp[self.key] = newkeyvalue - self._after_getbykey(newkeyvalue, tmp) - - - - - - - - class SlowlyChangingDimension(Dimension): """A class for accessing a slowly changing dimension. Does caching. From 974ffe5dbb6b60b009ee41b2f97d8dcd764c0892 Mon Sep 17 00:00:00 2001 From: Ian Dalton Date: Wed, 20 Jun 2012 14:40:34 -0600 Subject: [PATCH 3/3] fix typos --- pygrametl/tables.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pygrametl/tables.py b/pygrametl/tables.py index 4614035..5320cd4 100644 --- a/pygrametl/tables.py +++ b/pygrametl/tables.py @@ -1373,9 +1373,9 @@ def lookup(self, keyvalues, namemapping={}): - keyvalues: a dict at least containing values for all keys - namemapping: an optional namemapping (see module's documentation) """ - res = self._before_lookup(sefl, keyvalues, namemapping) + res = self._before_lookup(self, keyvalues, namemapping) if res: - returnself._emptyfacttonone(res) + return self._emptyfacttonone(res) self.targetconnection.execute(self.lookupsql, keyvalues, namemapping) res = self.targetconnection.fetchone(self.all) usedkeys = [key for key in self.keyrefs if res[key] is not None] #has values for keyrefs?