diff --git a/CHANGELOG.md b/CHANGELOG.md index 76f393a..6a6d1e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +- **0.3.0** (2025-04-15): + - now collecting more data: tags, license, geo-accuracy + - re-downloading these data for existing records + - database schema versioning + - moved to PyPi TrustedPublisher auth + - migrated to pyproject.toml-only style + - **0.2.1** (2024-01-08): - migrate to Digital Geography Lab’s GitHub diff --git a/src/flickrhistory/__init__.py b/src/flickrhistory/__init__.py index b0dd233..fdbe103 100644 --- a/src/flickrhistory/__init__.py +++ b/src/flickrhistory/__init__.py @@ -15,4 +15,4 @@ except ImportError: pass -__version__ = "0.2.1" +__version__ = "0.3.0.dev0" diff --git a/src/flickrhistory/basicflickrhistorydownloader.py b/src/flickrhistory/basicflickrhistorydownloader.py index 6b5f33b..227a7a7 100644 --- a/src/flickrhistory/basicflickrhistorydownloader.py +++ b/src/flickrhistory/basicflickrhistorydownloader.py @@ -21,7 +21,9 @@ from .cache import Cache from .cacheupdaterthread import CacheUpdaterThread from .config import Config +from .licensedownloader import LicenseDownloader from .photodownloaderthread import PhotoDownloaderThread +from .photoupdaterthread import PhotoUpdaterThread from .sigtermreceivedexception import SigTermReceivedException from .timespan import TimeSpan from .userprofileupdaterthread import UserProfileUpdaterThread @@ -30,7 +32,7 @@ class BasicFlickrHistoryDownloader: """Download (all) georeferenced flickr posts.""" - NUM_WORKERS = multiprocessing.cpu_count() + 1 # 1 == user_profile_updater + NUM_WORKERS = multiprocessing.cpu_count() NUM_MANAGERS = 2 # main thread + cache_updater # if output into pipe (e.g. logger, systemd), then @@ -54,6 +56,8 @@ def __init__(self): def download(self): """Download all georeferenced flickr posts.""" + LicenseDownloader(self._api_key_manager).update_licenses() + for gap in self.gaps_in_download_history: self._todo_deque.append(gap) @@ -74,6 +78,14 @@ def download(self): worker.start() self._worker_threads.append(worker) + # start photo record updaters + for i in range(self.NUM_WORKERS): + worker = PhotoUpdaterThread( + self._api_key_manager, (i + 1, self.NUM_WORKERS) + ) + worker.start() + self._worker_threads.append(worker) + # start cache updater self._cache_updater_thread = CacheUpdaterThread(self._done_queue) self._cache_updater_thread.start() diff --git a/src/flickrhistory/cacheupdaterthread.py b/src/flickrhistory/cacheupdaterthread.py index 0daf7ab..28c3e93 100644 --- a/src/flickrhistory/cacheupdaterthread.py +++ b/src/flickrhistory/cacheupdaterthread.py @@ -35,7 +35,10 @@ def run(self): try: newly_downloaded = self._done_queue.get(timeout=0.1) with Cache() as cache: - cache["already downloaded"] += newly_downloaded + try: + cache["already downloaded"] += newly_downloaded + except KeyError: + cache["already downloaded"] = newly_downloaded self.status = f"added {newly_downloaded}" except queue.Empty: if self.shutdown.is_set(): diff --git a/src/flickrhistory/database/__init__.py b/src/flickrhistory/database/__init__.py new file mode 100644 index 0000000..900ebaf --- /dev/null +++ b/src/flickrhistory/database/__init__.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + + +"""Database-related classes.""" + + +__all__ = [ + "License", + "Photo", + "PhotoSaver", + "Session", + "User", + "UserSaver", +] + +from .engine import Session +from .models import License, Photo, User +from .photo_saver import PhotoSaver +from .user_saver import UserSaver diff --git a/src/flickrhistory/database/databaseschemaupdater.py b/src/flickrhistory/database/databaseschemaupdater.py new file mode 100644 index 0000000..ccc4792 --- /dev/null +++ b/src/flickrhistory/database/databaseschemaupdater.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Update the database schema if necessary.""" + + +__all__ = ["DatabaseSchemaUpdater"] + + +import sys + +import sqlalchemy + +from .engine import engine + + +# for now, schema updates are SQL only and work on PostgreSQL, only. +# GeoAlchemy2 doesn’t really support SQLite, anyway +SCHEMA_UPDATES = { + # 0 -> 1 + 1: """ + ALTER TABLE + photos + ADD COLUMN IF NOT EXISTS + geo_accuracy SMALLINT; + + CREATE TABLE IF NOT EXISTS + licenses ( + id INTEGER, + name TEXT, + url TEXT + ); + + ALTER TABLE + photos + ADD COLUMN IF NOT EXISTS + license INTEGER REFERENCES licenses(id); + """, +} + + +class DatabaseSchemaUpdater: + """Update the database schema if necessary.""" + + LATEST = "LATEST" # ‘magic’, see def set_schema_version + + def __init__(self): + """Update the database schema if necessary.""" + # Try to create database table for schema version + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + """ + CREATE TABLE IF NOT EXISTS + schema_versions + ( + update TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + version INTEGER PRIMARY KEY + ); + """ + ) + ) + + @property + def installed_version(self): + """Return current version.""" + with engine.connect() as connection: + installed_version = connection.execute( + sqlalchemy.text( + """ + SELECT + COALESCE( + MAX(version), + 0 + ) AS version + FROM + schema_versions; + """ + ) + ).scalar_one_or_none() + return installed_version + + def update_to_latest(self): + """Update to the latest schema version.""" + installed_version = self.installed_version + while installed_version < max(SCHEMA_UPDATES.keys()): + print( + "Updating database schema (db version {:d}->{:d})".format( + installed_version, installed_version + 1 + ), + file=sys.stderr, + flush=True, # so that we don’t seem without work + ) + with engine.begin() as connection: + next_version = self.installed_version + 1 + connection.execute(sqlalchemy.text(SCHEMA_UPDATES[next_version])) + self.set_schema_version(next_version) + installed_version = self.installed_version + + @classmethod + def set_schema_version(cls, version): + """Set the schema version (without running update scripts).""" + if version == cls.LATEST: + version = max(SCHEMA_UPDATES.keys()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + """ + INSERT INTO + schema_versions (version) + VALUES ( + :version + ); + """ + ), + {"version": version}, + ) diff --git a/src/flickrhistory/database/engine.py b/src/flickrhistory/database/engine.py new file mode 100644 index 0000000..a36cd75 --- /dev/null +++ b/src/flickrhistory/database/engine.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""An SQLAlchemy engine and sessionmaker.""" + + +__all__ = ["engine", "Session"] + + +import multiprocessing + +import sqlalchemy +import sqlalchemy.orm + +from ..config import Config + + +POOL_SIZE = multiprocessing.cpu_count() * 10 + + +with Config() as config: + engine = sqlalchemy.create_engine( + config["database_connection_string"], + pool_size=POOL_SIZE, + max_overflow=POOL_SIZE, + ) + + +if engine.dialect.name == "postgresql": + with engine.connect() as connection: + connection.execute( + sqlalchemy.text( + """ + CREATE EXTENSION IF NOT EXISTS + postgis; + """ + ) + ) + + +Session = sqlalchemy.orm.sessionmaker(engine, autoflush=False) diff --git a/src/flickrhistory/database/models/__init__.py b/src/flickrhistory/database/models/__init__.py new file mode 100644 index 0000000..7098090 --- /dev/null +++ b/src/flickrhistory/database/models/__init__.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""ORM models for flickr entities.""" + + +__all__ = [ + "License", + "Photo", + "User", + "Tag", +] + + +import sqlalchemy + +from ..engine import engine +from .base import Base +from ..databaseschemaupdater import DatabaseSchemaUpdater +from .license import License +from .photo import Photo +from .tag import Tag +from .user import User + + +if sqlalchemy.inspect(engine).has_table(Photo.__table__.name): # data exist + DatabaseSchemaUpdater().update_to_latest() +else: + Base.metadata.create_all(engine) + DatabaseSchemaUpdater().set_schema_version(DatabaseSchemaUpdater.LATEST) diff --git a/src/flickrhistory/database/models/base.py b/src/flickrhistory/database/models/base.py new file mode 100644 index 0000000..76e03d5 --- /dev/null +++ b/src/flickrhistory/database/models/base.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""A common sqlalchemy declarative_base() to share between models.""" + + +__all__ = ["Base"] + + +import json +import re + +import sqlalchemy.ext.declarative +import sqlalchemy.orm + + +CAMEL_CASE_TO_SNAKE_CASE_RE = re.compile( + "((?<=[a-z0-9])[A-Z]|(?!^)(?" + + @sqlalchemy.orm.declared_attr + def __tablename__(cls): + """Return a table name derived from the class name.""" + snake_case = camel_case_to_snake_case(cls.__name__) + return f"{snake_case}s" + + def update(self, **kwargs): + """Update the values of this ORM object from keyword arguments.""" + for key, value in kwargs.items(): + setattr(self, key, value) + + +Base = sqlalchemy.ext.declarative.declarative_base(cls=Base) diff --git a/src/flickrhistory/database/models/license.py b/src/flickrhistory/database/models/license.py new file mode 100644 index 0000000..02ddb33 --- /dev/null +++ b/src/flickrhistory/database/models/license.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""ORM class to represent a flickr license.""" + + +__all__ = ["License"] + + +import sqlalchemy +import sqlalchemy.orm + +from .base import Base + + +class License(Base): + """ORM class to represent a flickr license.""" + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + name = sqlalchemy.Column(sqlalchemy.Text) + url = sqlalchemy.Column(sqlalchemy.Text) + photos = sqlalchemy.orm.relationship("Photo") diff --git a/src/flickrhistory/database/models/photo.py b/src/flickrhistory/database/models/photo.py new file mode 100644 index 0000000..5fc87c6 --- /dev/null +++ b/src/flickrhistory/database/models/photo.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""ORM class to represent a flickr photo.""" + + +__all__ = [ + "Photo", +] + + +import geoalchemy2 +import sqlalchemy +import sqlalchemy.orm + +from .base import Base + + +class Photo(Base): + """ORM class to represent a flickr photo (posts).""" + + id = sqlalchemy.Column(sqlalchemy.BigInteger, primary_key=True) + + server = sqlalchemy.Column(sqlalchemy.Integer) + secret = sqlalchemy.Column(sqlalchemy.LargeBinary) + + title = sqlalchemy.Column(sqlalchemy.Text) + description = sqlalchemy.Column(sqlalchemy.Text) + + date_taken = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) + date_posted = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) + + photo_url = sqlalchemy.Column( + sqlalchemy.Text, + sqlalchemy.Computed( + "'https://live.staticflickr.com/' || server::TEXT || '/' || " + + "id::TEXT || '_' || encode(secret, 'hex') || '_z.jpg'" + ), + ) + page_url = sqlalchemy.Column( + sqlalchemy.Text, + sqlalchemy.Computed( + "'https://www.flickr.com/photos/' || " + + "user_id::TEXT || '@N0' || user_farm::TEXT || '/' || " + + "id::TEXT || '/'" + ), + ) + + geom = sqlalchemy.Column(geoalchemy2.Geometry("POINT", 4326)) + geo_accuracy = sqlalchemy.Column(sqlalchemy.SmallInteger) + + user_id = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=False) + user_farm = sqlalchemy.Column(sqlalchemy.SmallInteger, nullable=False) + + tags = sqlalchemy.orm.relationship( + "Tag", + secondary="tag_photo_associations", + back_populates="photos", + ) + + license_id = sqlalchemy.Column( + sqlalchemy.Integer, + sqlalchemy.ForeignKey("licenses.id"), + index=True, + ) + license = sqlalchemy.orm.relationship("License", back_populates="photos") + + user = sqlalchemy.orm.relationship("User", back_populates="photos") + + __table_args__ = ( + sqlalchemy.ForeignKeyConstraint( + ["user_id", "user_farm"], ["users.id", "users.farm"], "User" + ), + ) + + @sqlalchemy.orm.validates("title", "description") + def _drop_nul_from_strings(self, key, address): + return address.replace("\x00", "") diff --git a/src/flickrhistory/database/models/tag.py b/src/flickrhistory/database/models/tag.py new file mode 100644 index 0000000..1a784a1 --- /dev/null +++ b/src/flickrhistory/database/models/tag.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""ORM class to represent a flickr tag.""" + + +__all__ = ["Tag"] + + +import sqlalchemy +import sqlalchemy.orm + +from .base import Base + + +class Tag(Base): + """ORM class to represent a flickr tag.""" + + tag = sqlalchemy.Column(sqlalchemy.Text, primary_key=True) + photos = sqlalchemy.orm.relationship( + "Photo", + secondary="tag_photo_associations", + back_populates="tags", + ) + + +class TagPhotoAssociation(Base): + """A many-to-many relation table between tags and photos.""" + + tag_tag = sqlalchemy.Column( + sqlalchemy.Text, + sqlalchemy.ForeignKey("tags.tag"), + primary_key=True, + ) + photo_id = sqlalchemy.Column( + sqlalchemy.BigInteger, + sqlalchemy.ForeignKey("photos.id"), + primary_key=True, + ) diff --git a/src/flickrhistory/database/models/user.py b/src/flickrhistory/database/models/user.py new file mode 100644 index 0000000..2f6470e --- /dev/null +++ b/src/flickrhistory/database/models/user.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""ORM class to represent a flickr user.""" + + +__all__ = ["User"] + + +import datetime + +import sqlalchemy +import sqlalchemy.orm + +from .base import Base + + +class User(Base): + """ORM class to represent a flickr user.""" + + id = sqlalchemy.Column(sqlalchemy.BigInteger) + farm = sqlalchemy.Column(sqlalchemy.SmallInteger) + nsid = sqlalchemy.Column( + sqlalchemy.Text, sqlalchemy.Computed("id::TEXT || '@N0' || farm::TEXT") + ) + + name = sqlalchemy.Column(sqlalchemy.Text) + first_name = sqlalchemy.Column(sqlalchemy.Text) + last_name = sqlalchemy.Column(sqlalchemy.Text) + real_name = sqlalchemy.Column( + sqlalchemy.Text, sqlalchemy.Computed("first_name || ' ' || last_name") + ) + + city = sqlalchemy.Column(sqlalchemy.Text) + country = sqlalchemy.Column(sqlalchemy.Text) + hometown = sqlalchemy.Column(sqlalchemy.Text) + + occupation = sqlalchemy.Column(sqlalchemy.Text) + description = sqlalchemy.Column(sqlalchemy.Text) + + join_date = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) + + website = sqlalchemy.Column(sqlalchemy.Text) + facebook = sqlalchemy.Column(sqlalchemy.Text) + twitter = sqlalchemy.Column(sqlalchemy.Text) + tumblr = sqlalchemy.Column(sqlalchemy.Text) + instagram = sqlalchemy.Column(sqlalchemy.Text) + pinterest = sqlalchemy.Column(sqlalchemy.Text) + + photos = sqlalchemy.orm.relationship("Photo", back_populates="user") + + __table_args__ = (sqlalchemy.PrimaryKeyConstraint("id", "farm"),) + + @classmethod + def from_raw_api_data_flickrphotossearch(cls, data): + """Initialise a new User with a flickr.photos.search data dict.""" + user_id, farm = data["owner"].split("@N0") + user_data = {"id": user_id, "farm": farm, "name": data["ownername"]} + return cls(**user_data) + + @classmethod + def from_raw_api_data_flickrprofilegetprofile(cls, data): + """Initialise a new User with a flickr.profile.getProfile data dict.""" + # the API does not always return all fields + + # "id" is the only field garantueed to be in the data + # (because we add it ourselves in databaseobjects.py in case parsing fails) + user_id, farm = data["id"].split("@N0") + + # "joindate" needs special attentation + try: + join_date = datetime.datetime.fromtimestamp( + int(data["join_date"]), tz=datetime.timezone.utc + ) + except KeyError: + join_date = None + + user_data = {"id": user_id, "farm": farm, "join_date": join_date} + + # all the other fields can be added as they are (if they exist) + for field in [ + "first_name", + "last_name", + "city", + "country", + "hometown", + "occupation", + "description", + "website", + "facebook", + "twitter", + "tumblr", + "instagram", + "pinterest", + ]: + try: + user_data[field] = data[field] + except KeyError: + pass + + return cls(**user_data) diff --git a/src/flickrhistory/database/photo_saver.py b/src/flickrhistory/database/photo_saver.py new file mode 100644 index 0000000..c30132c --- /dev/null +++ b/src/flickrhistory/database/photo_saver.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Save a flickr photo to the database.""" + + +import datetime + +from .models import License, Photo, Tag +from .engine import Session +from .user_saver import UserSaver + + +__all__ = ["PhotoSaver"] + + +class PhotoSaver: + """Save a flickr photo to the database.""" + + def save(self, data): + """Save a flickr photo to the database.""" + # the API does not always return all fields + # we need to figure out which ones we can use + + # and do quite a lot of clean-up because the flickr API + # also returns fairly weird data, sometimes + + # another side effect is that we can initialise + # with incomplete data (only id needed), + # which helps with bad API responses + + # store normalised data in dict + photo_data = {} + + # "id" is the only field garantueed to be in the data + # (because we add it ourselves in databaseobjects.py in case parsing fails) + photo_data["id"] = data["id"] + + # server and secret are kinda straight-forward + try: + photo_data["server"] = data["server"] + except KeyError: + pass + + try: + photo_data["secret"] = bytes.fromhex(data["secret"]) + except (ValueError, KeyError): # some non-hex character + pass + + try: + photo_data["title"] = data["title"] + except KeyError: + pass + + try: + photo_data["description"] = data["description"]["_content"] + except KeyError: + pass + + # the dates need special attention + try: + photo_data["date_taken"] = datetime.datetime.fromisoformat( + data["datetaken"] + ).astimezone(datetime.timezone.utc) + except ValueError: + # there is weirdly quite a lot of photos with + # date_taken "0000-01-01 00:00:00" + # Year 0 does not exist, there’s 1BCE, then 1CE, nothing in between + photo_data["date_taken"] = None + except KeyError: + # field does not exist in the dict we got + pass + + try: + photo_data["date_posted"] = datetime.datetime.fromtimestamp( + int(data["dateupload"]), tz=datetime.timezone.utc + ) + except KeyError: + pass + + # geometry + try: + longitude = float(data["longitude"]) + latitude = float(data["latitude"]) + assert longitude != 0 and latitude != 0 + photo_data["geom"] = f"SRID=4326;POINT({longitude:f} {latitude:f})" + except ( + AssertionError, # lon/lat is at exactly 0°N/S, 0°W/E -> bogus + KeyError, # not contained in API dict + TypeError, # weird data returned + ): + pass + + photo_data["geo_accuracy"] = int(data["accuracy"]) + + license = int(data["license"]) + + tags = data["tags"].split() + + with Session() as session, session.begin(): + + photo = session.get(Photo, photo_data["id"]) or Photo(id=photo_data["id"]) + user = UserSaver().save(data) + photo.user = user + + photo = session.merge(photo) + photo.update(**photo_data) + + photo.tags = [] + for tag in tags: + tag = session.merge(session.get(Tag, tag) or Tag(tag=tag)) + if tag not in photo.tags: + photo.tags.append(tag) + + license = session.merge( + session.get(License, license) or License(id=license) + ) + photo.license = license + + session.flush() + session.expunge(photo) + return photo diff --git a/src/flickrhistory/database/user_saver.py b/src/flickrhistory/database/user_saver.py new file mode 100644 index 0000000..fe8d1ca --- /dev/null +++ b/src/flickrhistory/database/user_saver.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Save a flickr user to the database.""" + + +import datetime + +from .models import User +from .engine import Session + + +__all__ = ["UserSaver"] + + +class UserSaver: + """Save a flickr user to the database.""" + + def save(self, data): + """Save a flickr user to the database.""" + # We accept raw data from two different API endpoints + # that return different data in different ontologies + user_data = {} + if "owner" in data: + # -> from photos.search + user_id, farm = data["owner"].split("@N0") + + user_data["name"] = data["ownername"] + else: + # from profile.getprofile + user_id, farm = data["id"].split("@N0") + + data["join_date"] = datetime.datetime.fromtimestamp( + int(data["join_date"]), tz=datetime.timezone.utc + ) + + for field in [ + "first_name", + "last_name", + "name", + "join_date", + "city", + "country", + "hometown", + "occupation", + "description", + "website", + "facebook", + "twitter", + "tumblr", + "instagram", + "pinterest", + ]: + try: + user_data[field] = data[field] + except KeyError: + pass + + with Session() as session, session.begin(): + user = session.get(User, (user_id, farm)) or User(id=user_id, farm=farm) + user = session.merge(user) + user.update(**user_data) + + session.flush() + session.expunge(user) + return user diff --git a/src/flickrhistory/databaseobjects.py b/src/flickrhistory/databaseobjects.py deleted file mode 100644 index 8e6c092..0000000 --- a/src/flickrhistory/databaseobjects.py +++ /dev/null @@ -1,259 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - - -"""Base classes to represent flickr posts and users.""" - - -__all__ = ["FlickrPhoto", "FlickrUser"] - - -import datetime - -import geoalchemy2 -import sqlalchemy -import sqlalchemy.ext.declarative -import sqlalchemy.ext.hybrid -import sqlalchemy.orm - -from .config import Config - - -Base = sqlalchemy.ext.declarative.declarative_base() -config = Config() - - -class FlickrUser(Base): - """ORM class to represent a flickr user.""" - - __tablename__ = "users" - - id = sqlalchemy.Column(sqlalchemy.BigInteger) - farm = sqlalchemy.Column(sqlalchemy.SmallInteger) - nsid = sqlalchemy.Column( - sqlalchemy.Text, sqlalchemy.Computed("id::TEXT || '@N0' || farm::TEXT") - ) - - name = sqlalchemy.Column(sqlalchemy.Text) - first_name = sqlalchemy.Column(sqlalchemy.Text) - last_name = sqlalchemy.Column(sqlalchemy.Text) - real_name = sqlalchemy.Column( - sqlalchemy.Text, sqlalchemy.Computed("first_name || ' ' || last_name") - ) - - city = sqlalchemy.Column(sqlalchemy.Text) - country = sqlalchemy.Column(sqlalchemy.Text) - hometown = sqlalchemy.Column(sqlalchemy.Text) - - occupation = sqlalchemy.Column(sqlalchemy.Text) - description = sqlalchemy.Column(sqlalchemy.Text) - - join_date = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) - - website = sqlalchemy.Column(sqlalchemy.Text) - facebook = sqlalchemy.Column(sqlalchemy.Text) - twitter = sqlalchemy.Column(sqlalchemy.Text) - tumblr = sqlalchemy.Column(sqlalchemy.Text) - instagram = sqlalchemy.Column(sqlalchemy.Text) - pinterest = sqlalchemy.Column(sqlalchemy.Text) - - photos = sqlalchemy.orm.relationship("FlickrPhoto", back_populates="user") - - __table_args__ = (sqlalchemy.PrimaryKeyConstraint("id", "farm"),) - - @classmethod - def from_raw_api_data_flickrphotossearch(cls, data): - """Initialise a new FlickrUser with a flickr.photos.search data dict.""" - user_id, farm = data["owner"].split("@N0") - user_data = {"id": user_id, "farm": farm, "name": data["ownername"]} - return cls(**user_data) - - @classmethod - def from_raw_api_data_flickrprofilegetprofile(cls, data): - """Initialise a new FlickrUser with a flickr.profile.getProfile data dict.""" - # the API does not always return all fields - - # "id" is the only field garantueed to be in the data - # (because we add it ourselves in databaseobjects.py in case parsing fails) - user_id, farm = data["id"].split("@N0") - - # "joindate" needs special attentation - try: - join_date = datetime.datetime.fromtimestamp( - int(data["join_date"]), tz=datetime.timezone.utc - ) - except KeyError: - join_date = None - - user_data = {"id": user_id, "farm": farm, "join_date": join_date} - - # all the other fields can be added as they are (if they exist) - for field in [ - "first_name", - "last_name", - "city", - "country", - "hometown", - "occupation", - "description", - "website", - "facebook", - "twitter", - "tumblr", - "instagram", - "pinterest", - ]: - try: - user_data[field] = data[field] - except KeyError: - pass - - return cls(**user_data) - - def __str__(self): - """Return a str representation.""" - return f"" - - def __repr(self): - """Return a str representation.""" - return str(self) - - -class FlickrPhoto(Base): - """ORM class to represent a flickr photo (posts).""" - - __tablename__ = "photos" - - id = sqlalchemy.Column(sqlalchemy.BigInteger, primary_key=True) - - server = sqlalchemy.Column(sqlalchemy.Integer) - secret = sqlalchemy.Column(sqlalchemy.LargeBinary) - - title = sqlalchemy.Column(sqlalchemy.Text) - description = sqlalchemy.Column(sqlalchemy.Text) - - date_taken = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) - date_posted = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True)) - - photo_url = sqlalchemy.Column( - sqlalchemy.Text, - sqlalchemy.Computed( - "'https://live.staticflickr.com/' || server::TEXT || '/' || " - + "id::TEXT || '_' || encode(secret, 'hex') || '_z.jpg'" - ), - ) - page_url = sqlalchemy.Column( - sqlalchemy.Text, - sqlalchemy.Computed( - "'https://www.flickr.com/photos/' || " - + "user_id::TEXT || '@N0' || user_farm::TEXT || '/' || " - + "id::TEXT || '/'" - ), - ) - - geom = sqlalchemy.Column(geoalchemy2.Geometry("POINT", 4326)) - - user_id = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=False) - user_farm = sqlalchemy.Column(sqlalchemy.SmallInteger, nullable=False) - - user = sqlalchemy.orm.relationship("FlickrUser", back_populates="photos") - - __table_args__ = ( - sqlalchemy.ForeignKeyConstraint( - ["user_id", "user_farm"], ["users.id", "users.farm"], "FlickrUser" - ), - ) - - @classmethod - def from_raw_api_data_flickrphotossearch(cls, data): - """Initialise a new FlickrPhoto with a flickr.photos.search data dict.""" - # the API does not always return all fields - # we need to figure out which ones we can use - - # and do quite a lot of clean-up because the flickr API - # also returns fairly weird data, sometimes - - # another side effect is that we can initialise - # with incomplete data (only id needed), - # which helps with bad API responses - - photo_data = {} - - # "id" is the only field garantueed to be in the data - # (because we add it ourselves in databaseobjects.py in case parsing fails) - photo_data["id"] = data["id"] - - # server and secret are kinda straight-forward - try: - photo_data["server"] = data["server"] - except KeyError: - pass - - try: - photo_data["secret"] = bytes.fromhex(data["secret"]) - except (ValueError, KeyError): # some non-hex character - pass - - try: - photo_data["title"] = data["title"] - except KeyError: - pass - - try: - photo_data["description"] = data["description"]["_content"] - except KeyError: - pass - - # the dates need special attention - try: - photo_data["date_taken"] = datetime.datetime.fromisoformat( - data["datetaken"] - ).astimezone(datetime.timezone.utc) - except ValueError: - # there is weirdly quite a lot of photos with - # date_taken "0000-01-01 00:00:00" - # Year 0 does not exist, there’s 1BCE, then 1CE, nothing in between - photo_data["date_taken"] = None - except KeyError: - # field does not exist in the dict we got - pass - - try: - photo_data["date_posted"] = datetime.datetime.fromtimestamp( - int(data["dateupload"]), tz=datetime.timezone.utc - ) - except KeyError: - pass - - # geometry - try: - longitude = float(data["longitude"]) - latitude = float(data["latitude"]) - assert longitude != 0 and latitude != 0 - photo_data["geom"] = f"SRID=4326;POINT({longitude:f} {latitude:f})" - except ( - AssertionError, # lon/lat is at exactly 0°N/S, 0°W/E -> bogus - KeyError, # not contained in API dict - TypeError, # weird data returned - ): - pass - - # finally, the user - # (let’s just delegate that to the FlickrUser constructor) - photo_data["user"] = FlickrUser.from_raw_api_data_flickrphotossearch(data) - - return cls(**photo_data) - - def __str__(self): - """Return a str representation.""" - return f"" - - def __repr(self): - """Return a str representation.""" - return str(self) - - -# Create tables in case we know where -if "database_connection_string" in config: - engine = sqlalchemy.create_engine(config["database_connection_string"]) - Base.metadata.create_all(engine) diff --git a/src/flickrhistory/licensedownloader.py b/src/flickrhistory/licensedownloader.py new file mode 100644 index 0000000..d0fb120 --- /dev/null +++ b/src/flickrhistory/licensedownloader.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Update the list of licenses.""" + + +__all__ = ["LicenseDownloader"] + + +import json + +import requests +import urllib3 + +from .database import License, Session +from .exceptions import ApiResponseError + + +class LicenseDownloader: + """Update the list of licenses.""" + + API_ENDPOINT_URL = "https://api.flickr.com/services/rest/" + + def __init__(self, api_key_manager): + """Update the list of licenses.""" + self._api_key_manager = api_key_manager + + def update_licenses(self): + """Update the list of licenses.""" + query = { + "method": "flickr.photos.licenses.getInfo", + "format": "json", + "nojsoncallback": True, + } + + with self._api_key_manager.get_api_key() as api_key: + params = {"api_key": api_key} + params.update(query) + + try: + with requests.get(self.API_ENDPOINT_URL, params=params) as response: + results = response.json() + except ( + ConnectionError, + json.decoder.JSONDecodeError, + requests.exceptions.RequestException, + urllib3.exceptions.HTTPError, + ) as exception: + raise ApiResponseError() from exception + + with Session() as session, session.begin(): + for license in results["licenses"]["license"]: + license_id = license["id"] + license_name = license["name"] + license_url = license["url"] + license = session.get(License, license_id) or License( + id=license_id, + name=license_name, + url=license_url, + ) + session.merge(license) diff --git a/src/flickrhistory/photodownloader.py b/src/flickrhistory/photodownloader.py index 3930e1a..16e0033 100644 --- a/src/flickrhistory/photodownloader.py +++ b/src/flickrhistory/photodownloader.py @@ -17,6 +17,10 @@ from .exceptions import ApiResponseError, DownloadBatchIsTooLargeError +MAX_PHOTOS_PER_BATCH = 3000 +ONE_SECOND = datetime.timedelta(seconds=1) + + class PhotoDownloader: """Download all data covering a time span from the flickr API.""" @@ -37,7 +41,15 @@ def photos(self): "per_page": 500, "has_geo": 1, "extras": ", ".join( - ["description", "date_upload", "date_taken", "geo", "owner_name"] + [ + "description", + "date_upload", + "date_taken", + "geo", + "owner_name", + "tags", + "license", + ] ), "min_upload_date": self._timespan.start.timestamp(), "max_upload_date": self._timespan.end.timestamp(), @@ -67,30 +79,35 @@ def photos(self): raise ApiResponseError() from exception try: - num_photos = int(results["photos"]["total"]) - except TypeError: - num_photos = 0 - - if num_photos > 4000 and self._timespan.duration > datetime.timedelta( - seconds=1 - ): - raise DownloadBatchIsTooLargeError( - f"More than 4000 rows returned ({num_photos}), " - "please specify a shorter time span." - ) - - for photo in results["photos"]["photo"]: - # the flickr API is matching date_posted very fuzzily, - # let’s not waste time with duplicates + try: + num_photos = int(results["photos"]["total"]) + except TypeError: + num_photos = 0 + if ( - datetime.datetime.fromtimestamp( - int(photo["dateupload"]), tz=datetime.timezone.utc - ) - > self._timespan.end + num_photos > MAX_PHOTOS_PER_BATCH + and self._timespan.duration > ONE_SECOND ): - break + raise DownloadBatchIsTooLargeError( + f"More than {MAX_PHOTOS_PER_BATCH} rows returned ({num_photos}), " + "please specify a shorter time span." + ) - yield photo + for photo in results["photos"]["photo"]: + # the flickr API is matching date_posted very fuzzily, + # let’s not waste time with duplicates + if ( + datetime.datetime.fromtimestamp( + int(photo["dateupload"]), tz=datetime.timezone.utc + ) + > self._timespan.end + ): + continue + + yield photo + + except KeyError: + pass # moving on to next page, if exists page += 1 if page > int(results["photos"]["pages"]): diff --git a/src/flickrhistory/photodownloaderthread.py b/src/flickrhistory/photodownloaderthread.py index 8df4bb2..8dee522 100644 --- a/src/flickrhistory/photodownloaderthread.py +++ b/src/flickrhistory/photodownloaderthread.py @@ -9,12 +9,8 @@ import threading -import time -import sqlalchemy - -from .config import Config -from .databaseobjects import FlickrPhoto +from .database import PhotoSaver from .exceptions import ApiResponseError, DownloadBatchIsTooLargeError from .photodownloader import PhotoDownloader @@ -44,11 +40,6 @@ def __init__(self, api_key_manager, todo_deque, done_queue): self.shutdown = threading.Event() - with Config() as config: - self._engine = sqlalchemy.create_engine( - config["database_connection_string"] - ) - def run(self): """Get TimeSpans off todo_deque and download photos.""" while not self.shutdown.is_set(): @@ -61,34 +52,14 @@ def run(self): try: for photo in photo_downloader.photos: - with sqlalchemy.orm.Session(self._engine) as session: - try: - with session.begin(): - flickr_photo = ( - FlickrPhoto.from_raw_api_data_flickrphotossearch( - photo - ) - ) - session.merge(flickr_photo) - except sqlalchemy.exc.IntegrityError: - # remedy race conditions - # TODO: find out how to avoid them - time.sleep(1.0) - with session.begin(): - session.flush() - flickr_photo = ( - FlickrPhoto.from_raw_api_data_flickrphotossearch( - photo - ) - ) - session.merge(flickr_photo) + photo = PhotoSaver().save(photo) self.count += 1 if self.shutdown.is_set(): # let’s only report back on how much we # in fact downloaded, not what our quota was - timespan.end = flickr_photo.date_posted + timespan.end = photo.date_posted break except ApiResponseError: diff --git a/src/flickrhistory/photoupdater.py b/src/flickrhistory/photoupdater.py new file mode 100644 index 0000000..7e2fb18 --- /dev/null +++ b/src/flickrhistory/photoupdater.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Download all data covering a time span from the flickr API.""" + + +__all__ = ["PhotoUpdater"] + + +import json + +import requests +import urllib3 + +from .exceptions import ApiResponseError + + +class PhotoUpdater: + """ + Download photo data from the flickr API. + + Photo data downloaded with flickrhistory<0.3.0 do not contain information on + geo accuracy, license, tags. This re-fetches that information. + """ + + API_ENDPOINT_URL = "https://api.flickr.com/services/rest/" + + def __init__(self, api_key_manager): + """Intialize an PhotoUpdater.""" + self._api_key_manager = api_key_manager + + def get_info_for_photo_id(self, photo_id): + """Get profile data by photo_id.""" + query = { + "method": "flickr.photos.getInfo", + "format": "json", + "nojsoncallback": True, + "photo_id": photo_id, + } + + params = {} + with self._api_key_manager.get_api_key() as api_key: + params["api_key"] = api_key + params.update(query) + + try: + with requests.get(self.API_ENDPOINT_URL, params=params) as response: + results = response.json() + assert "photo" in results + + data = { + "id": photo_id, + "tags": " ".join( + [tag["_content"] for tag in results["photo"]["tags"]["tag"]] + ), + "license": int(results["photo"]["license"]), + "accuracy": int(results["photo"]["location"]["accuracy"]), + "owner": results["photo"]["owner"]["nsid"], + "ownername": results["photo"]["owner"]["realname"], + } + + except ( + ConnectionError, + json.decoder.JSONDecodeError, + requests.exceptions.RequestException, + urllib3.exceptions.HTTPError, + ) as exception: + # API hicups, let’s consider this batch + # unsuccessful and start over + raise ApiResponseError() from exception + + except AssertionError: + # if API hicups, return a stub data dict + data = {"id": photo_id} + + return data diff --git a/src/flickrhistory/photoupdaterthread.py b/src/flickrhistory/photoupdaterthread.py new file mode 100644 index 0000000..3849fdc --- /dev/null +++ b/src/flickrhistory/photoupdaterthread.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +"""Thread to complete missing data on photos.""" + + +__all__ = ["PhotoUpdaterThread"] + + +import threading +import time + +import sqlalchemy + +from .config import Config +from .database import Photo, PhotoSaver, Session +from .exceptions import ApiResponseError +from .photoupdater import PhotoUpdater + + +class PhotoUpdaterThread(threading.Thread): + """Finds incomplete photos and downloads missing data from the flickr API.""" + + def __init__(self, api_key_manager, partition=None): + """ + Intialize a PhotoUpdaterThread. + + Args: + api_key_manager: instance of an ApiKeyManager + partition (tuple of int): download the n-th of m parts of incomplete photos + + """ + super().__init__() + + self.count = 0 + + self._api_key_manager = api_key_manager + try: + part, number_of_partitions = partition + assert part > 0 + assert part <= number_of_partitions + self._bounds = ( + (part - 1) * 1.0 / number_of_partitions, + part * 1.0 / number_of_partitions, + ) + except (AssertionError, TypeError): + self._bounds = None + + self.shutdown = threading.Event() + + with Config() as config: + self._engine = sqlalchemy.create_engine( + config["database_connection_string"] + ) + + @property + def ids_of_photos_without_detailed_information(self): + """Find ids of incomplete photo profiles.""" + # Find id of incomplete photo records + # We use geo_accuracy IS NULL + with Session() as session: + if self._bounds is None: + ids_of_photos_without_detailed_information = session.query( + Photo.id + ).filter_by(geo_accuracy=None) + else: + bounds = ( + sqlalchemy.select( + sqlalchemy.sql.functions.percentile_disc(self._bounds[0]) + .within_group(Photo.id) + .label("lower"), + sqlalchemy.sql.functions.percentile_disc(self._bounds[1]) + .within_group(Photo.id) + .label("upper"), + ) + .select_from(Photo) + .filter_by(geo_accuracy=None) + .cte() + ) + ids_of_photos_without_detailed_information = ( + session.query(Photo.id) + .filter_by(geo_accuracy=None) + .where(Photo.id.between(bounds.c.lower, bounds.c.upper)) + .yield_per(1000) + ) + + for (id,) in ids_of_photos_without_detailed_information: + yield id + + def run(self): + """Get TimeSpans off todo_queue and download photos.""" + photo_updater = PhotoUpdater(self._api_key_manager) + + while not self.shutdown.is_set(): + for photo_id in self.ids_of_photos_without_detailed_information: + try: + PhotoSaver().save(photo_updater.get_info_for_photo_id(photo_id)) + self.count += 1 + + except ApiResponseError: + # API returned some bogus/none-JSON data, + # let’s try again later + continue + + if self.shutdown.is_set(): + break + + # once no incomplete photo profiles remain, + # wait for ten minutes before trying again; + # wake up every 1/10 sec to check whether we + # should shut down + for _ in range(10 * 60 * 10): + if self.shutdown.is_set(): + break + time.sleep(0.1) diff --git a/src/flickrhistory/userprofileupdaterthread.py b/src/flickrhistory/userprofileupdaterthread.py index fec56cd..ab89bcf 100644 --- a/src/flickrhistory/userprofileupdaterthread.py +++ b/src/flickrhistory/userprofileupdaterthread.py @@ -14,7 +14,7 @@ import sqlalchemy from .config import Config -from .databaseobjects import FlickrUser +from .database import User, UserSaver from .exceptions import ApiResponseError from .userprofiledownloader import UserProfileDownloader @@ -22,10 +22,6 @@ class UserProfileUpdaterThread(threading.Thread): """Finds incomplete user profiles and downloads missing data from the flickr API.""" - MAX_RETRIES = ( - 5 # once all users have been updated, retry this times (with 10 min breaks) - ) - def __init__(self, api_key_manager, partition=None): """ Intialize a UserProfileUpdateThread. @@ -68,26 +64,26 @@ def nsids_of_users_without_detailed_information(self): with sqlalchemy.orm.Session(self._engine) as session: if self._bounds is None: nsids_of_users_without_detailed_information = session.query( - FlickrUser.nsid + User.nsid ).filter_by(join_date=None) else: bounds = ( sqlalchemy.select( sqlalchemy.sql.functions.percentile_disc(self._bounds[0]) - .within_group(FlickrUser.id) + .within_group(User.id) .label("lower"), sqlalchemy.sql.functions.percentile_disc(self._bounds[1]) - .within_group(FlickrUser.id) + .within_group(User.id) .label("upper"), ) - .select_from(FlickrUser) + .select_from(User) .filter_by(join_date=None) .cte() ) nsids_of_users_without_detailed_information = ( - session.query(FlickrUser.nsid) + session.query(User.nsid) .filter_by(join_date=None) - .where(FlickrUser.id.between(bounds.c.lower, bounds.c.upper)) + .where(User.id.between(bounds.c.lower, bounds.c.upper)) .yield_per(1000) ) @@ -98,22 +94,10 @@ def run(self): """Get TimeSpans off todo_queue and download photos.""" user_profile_downloader = UserProfileDownloader(self._api_key_manager) - retries = 0 - - while not (self.shutdown.is_set() or retries >= self.MAX_RETRIES): + while not self.shutdown.is_set(): for nsid in self.nsids_of_users_without_detailed_information: try: - with ( - sqlalchemy.orm.Session(self._engine) as session, - session.begin(), - ): - flickr_user = ( - FlickrUser.from_raw_api_data_flickrprofilegetprofile( - user_profile_downloader.get_profile_for_nsid(nsid) - ) - ) - session.merge(flickr_user) - + UserSaver().save(user_profile_downloader.get_profile_for_nsid(nsid)) self.count += 1 except ApiResponseError: @@ -132,4 +116,3 @@ def run(self): if self.shutdown.is_set(): break time.sleep(0.1) - retries += 1