From cb5c357647bd30ac940b8cd9162107182e77d1e7 Mon Sep 17 00:00:00 2001 From: Peter Nerlich Date: Tue, 18 Nov 2025 15:12:51 +0100 Subject: [PATCH] add celery support --- linkcheck/celery.py | 26 +++++++ linkcheck/linkcheck_settings.py | 1 + linkcheck/listeners.py | 120 +++++++++++--------------------- linkcheck/worker_tasks.py | 80 +++++++++++++++++++++ 4 files changed, 148 insertions(+), 79 deletions(-) create mode 100644 linkcheck/celery.py create mode 100644 linkcheck/worker_tasks.py diff --git a/linkcheck/celery.py b/linkcheck/celery.py new file mode 100644 index 0000000..3ca406c --- /dev/null +++ b/linkcheck/celery.py @@ -0,0 +1,26 @@ +from celery import shared_task +from django.apps import apps + +from .worker_tasks import do_check_instance_links, do_instance_post_save + + +@shared_task +def do_check_link(app: str, model: str, id: int, **kwargs): + sender = apps.get_app_config(app).get_model(model) + + linklist_cls = sender._linklist + + instance = sender.objects.get(id=id) + + do_check_instance_links(sender, instance, linklist_cls) + + +@shared_task +def do_post_save(app: str, model: str, id: int, **kwargs): + sender = apps.get_app_config(app).get_model(model) + + linklist_cls = sender._linklist + + instance = sender.objects.get(id=id) + + do_instance_post_save(sender, instance, linklist_cls, **kwargs) diff --git a/linkcheck/linkcheck_settings.py b/linkcheck/linkcheck_settings.py index 617aef4..07347e0 100644 --- a/linkcheck/linkcheck_settings.py +++ b/linkcheck/linkcheck_settings.py @@ -58,6 +58,7 @@ RESULTS_PER_PAGE = getattr(settings, 'LINKCHECK_RESULTS_PER_PAGE', 500) SITE_DOMAINS = getattr(settings, 'LINKCHECK_SITE_DOMAINS', []) DISABLE_LISTENERS = getattr(settings, 'LINKCHECK_DISABLE_LISTENERS', False) +LINKCHECK_IN_CELERY = getattr(settings, 'LINKCHECK_IN_CELERY', False) TOLERATE_BROKEN_ANCHOR = getattr(settings, 'LINKCHECK_TOLERATE_BROKEN_ANCHOR', True) PROXIES = getattr(settings, 'LINKCHECK_PROXIES', {}) TRUST_PROXY_SSL = getattr(settings, 'LINKCHECK_TRUST_PROXY_SSL', False) diff --git a/linkcheck/listeners.py b/linkcheck/listeners.py index 0642aeb..0a510c5 100644 --- a/linkcheck/listeners.py +++ b/linkcheck/listeners.py @@ -1,17 +1,26 @@ import logging import sys -import time from contextlib import contextmanager +from functools import partial from queue import Empty, LifoQueue from threading import Thread from django.apps import apps +from django.db import transaction from django.db.models import signals as model_signals from linkcheck.models import Link, Url -from . import filebrowser, update_lock -from .linkcheck_settings import MAX_URL_LENGTH +from . import filebrowser +from .linkcheck_settings import LINKCHECK_IN_CELERY +from .worker_tasks import ( + do_check_instance_links, + do_instance_post_save, + do_instance_pre_delete, +) + +if LINKCHECK_IN_CELERY: + from .celery import do_check_link, do_post_save logger = logging.getLogger(__name__) @@ -65,57 +74,24 @@ def check_instance_links(sender, instance, **kwargs): """ linklist_cls = sender._linklist - def do_check_instance_links(sender, instance, wait=False): - # On some installations, this wait time might be enough for the - # thread transaction to account for the object change (GH #41). - # A candidate for the future post_commit signal. - - global worker_running # noqa - - if wait: - time.sleep(0.1) - with update_lock: - content_type = linklist_cls.content_type() - new_links = [] - old_links = Link.objects.filter(content_type=content_type, object_id=instance.pk) - - linklists = linklist_cls().get_linklist(extra_filter={'pk': instance.pk}) - - if not linklists: - # This object is no longer watched by linkcheck according to object_filter - links = [] - else: - linklist = linklists[0] - links = linklist['urls']+linklist['images'] - - for link in links: - # url structure = (field, link text, url) - url = link[2] - if url.startswith('#'): - url = instance.get_absolute_url() + url - - if len(url) > MAX_URL_LENGTH: - # We cannot handle url longer than MAX_URL_LENGTH at the moment - logger.warning('URL exceeding max length will be skipped: %s', url) - continue - - u, created = Url.objects.get_or_create(url=url) - l, created = Link.objects.get_or_create( - url=u, field=link[0], text=link[1], content_type=content_type, object_id=instance.pk - ) - new_links.append(l.id) - u.check_url() - - gone_links = old_links.exclude(id__in=new_links) - gone_links.delete() - # Don't run in a separate thread if we are running tests if tests_running: - do_check_instance_links(sender, instance) + do_check_instance_links(sender, instance, linklist_cls) + elif LINKCHECK_IN_CELERY: + # We're not working in the same db/transaction context, + # so we need to ensure the task is only run after any transaction is committed + transaction.on_commit(partial( + do_check_link.apply_async, + kwargs={ + "app": sender._meta.app_label, + "model": sender.__name__, + "id": instance.id, + }, + )) else: tasks_queue.put({ 'target': do_check_instance_links, - 'args': (sender, instance, True), + 'args': (sender, instance, linklist_cls, True), 'kwargs': {} }) start_worker() @@ -155,44 +131,30 @@ def instance_post_save(sender, instance, **kwargs): if kwargs.get('raw'): return - def do_instance_post_save(sender, instance, **kwargs): - current_url = instance.get_absolute_url() - previous_url = getattr(instance, '__previous_url', None) - # We assume returning None from get_absolute_url means that this instance doesn't have a URL - # Not sure if we should do the same for '' as this could refer to '/' - if current_url is not None and current_url != previous_url: - linklist_cls = sender._linklist - active = linklist_cls.objects().filter(pk=instance.pk).count() - - if kwargs['created'] or (not active): - new_urls = Url.objects.filter(url__startswith=current_url) - else: - new_urls = Url.objects.filter(status=False).filter(url__startswith=current_url) - - if new_urls: - for url in new_urls: - url.check_url() + linklist_cls = sender._linklist if tests_running: - do_instance_post_save(sender, instance, **kwargs) + do_instance_post_save(sender, instance, linklist_cls, **kwargs) + elif LINKCHECK_IN_CELERY: + # We're not working in the same db/transaction context, + # so we need to ensure the task is only run after any transaction is committed + transaction.on_commit(partial( + do_post_save.apply_async, + kwargs={ + "app": sender._meta.app_label, + "model": sender.__name__, + "id": instance.id, + } | ({"created": kwargs["created"]} if "created" in kwargs else {}), + )) else: tasks_queue.put({ 'target': do_instance_post_save, - 'args': (sender, instance), + 'args': (sender, instance, linklist_cls), 'kwargs': kwargs }) start_worker() -def instance_pre_delete(sender, instance, **kwargs): - instance.linkcheck_deleting = True - deleted_url = instance.get_absolute_url() - if deleted_url: - old_urls = Url.objects.filter(url__startswith=deleted_url).exclude(status=False) - if old_urls: - old_urls.update(status=False, message='Broken internal link') - - def register_listeners(): # 1. register listeners for the objects that contain Links for linklist_name, linklist_cls in apps.get_app_config('linkcheck').all_linklists.items(): @@ -204,7 +166,7 @@ def register_listeners(): if getattr(linklist_cls.model, 'get_absolute_url', None): model_signals.pre_save.connect(instance_pre_save, sender=linklist_cls.model) model_signals.post_save.connect(instance_post_save, sender=linklist_cls.model) - model_signals.pre_delete.connect(instance_pre_delete, sender=linklist_cls.model) + model_signals.pre_delete.connect(do_instance_pre_delete, sender=linklist_cls.model) filebrowser.register_listeners() @@ -220,7 +182,7 @@ def unregister_listeners(): if getattr(linklist_cls.model, 'get_absolute_url', None): model_signals.pre_save.disconnect(instance_pre_save, sender=linklist_cls.model) model_signals.post_save.disconnect(instance_post_save, sender=linklist_cls.model) - model_signals.pre_delete.disconnect(instance_pre_delete, sender=linklist_cls.model) + model_signals.pre_delete.disconnect(do_instance_pre_delete, sender=linklist_cls.model) filebrowser.unregister_listeners() diff --git a/linkcheck/worker_tasks.py b/linkcheck/worker_tasks.py new file mode 100644 index 0000000..5e990d2 --- /dev/null +++ b/linkcheck/worker_tasks.py @@ -0,0 +1,80 @@ +import logging +import time + +from linkcheck.models import Link, Url + +from . import update_lock +from .linkcheck_settings import MAX_URL_LENGTH + +logger = logging.getLogger(__name__) + + +def do_check_instance_links(sender, instance, linklist_cls, wait=False): + # On some installations, this wait time might be enough for the + # thread transaction to account for the object change (GH #41). + # A candidate for the future post_commit signal. + + if wait: + time.sleep(0.1) + with update_lock: + content_type = linklist_cls.content_type() + new_links = [] + old_links = Link.objects.filter(content_type=content_type, object_id=instance.pk) + + linklists = linklist_cls().get_linklist(extra_filter={'pk': instance.pk}) + + if not linklists: + # This object is no longer watched by linkcheck according to object_filter + links = [] + else: + linklist = linklists[0] + links = linklist['urls']+linklist['images'] + + for link in links: + # url structure = (field, link text, url) + url = link[2] + if url.startswith('#'): + url = instance.get_absolute_url() + url + + if len(url) > MAX_URL_LENGTH: + # We cannot handle url longer than MAX_URL_LENGTH at the moment + logger.warning('URL exceeding max length will be skipped: %s', url) + continue + + u, created = Url.objects.get_or_create(url=url) + l, created = Link.objects.get_or_create( + url=u, field=link[0], text=link[1], content_type=content_type, object_id=instance.pk + ) + new_links.append(l.id) + u.check_url() + + gone_links = old_links.exclude(id__in=new_links) + gone_links.delete() + + +def do_instance_post_save(sender, instance, linklist_cls, **kwargs): + current_url = instance.get_absolute_url() + previous_url = getattr(instance, '__previous_url', None) + # We assume returning None from get_absolute_url means that this instance doesn't have a URL + # Not sure if we should do the same for '' as this could refer to '/' + if current_url is not None and current_url != previous_url: + linklist_cls = sender._linklist + active = linklist_cls.objects().filter(pk=instance.pk).count() + + if kwargs['created'] or (not active): + new_urls = Url.objects.filter(url__startswith=current_url) + else: + new_urls = Url.objects.filter(status=False).filter(url__startswith=current_url) + + if new_urls: + for url in new_urls: + url.check_url() + + +def do_instance_pre_delete(sender, instance, **kwargs): + instance.linkcheck_deleting = True + deleted_url = instance.get_absolute_url() + if deleted_url: + old_urls = Url.objects.filter(url__startswith=deleted_url).exclude(status=False) + if old_urls: + old_urls.update(status=False, message='Broken internal link')