[yocto] [layerindex-web][PATCH] WIP solution to bug-11197: adding asynchronous task execution with celery and rabbitmq

Diana Thayer garbados at gmail.com
Mon Jun 19 11:16:31 PDT 2017


Hello!

This is a work-in-progress solution to issue 11197 within layerindex-web: adding asynchronous task execution.
I used RabbitMQ and Celery. I'm still working with Paul Eggleton on adding install instructions for RabbitMQ to
the site's Dockerfile.

Previously, when a user submitted a layer, Django would synchronously send emails to users able to publish layers,
such that if an email bounced or otherwise failed, the user submitting the layer would see an error.
This patch sends those emails as asynchronous tasks, so any sending errors don't reach the user submitting a layer.

Thanks for reviewing this! Here's a brief changelog:

- updated readme to include RabbitMQ as a dependency
- updated settings.py to include RABBIT_BROKER and RABBIT_BACKEND
- added celery to requirements.txt
- defined celery tasks in layerindex/tasks.py
- separated layerindex/update#main into two functions: main (which parses sys.argv) and do_update (which accepts options as keyword arguments)
- updated layerindex/views.py to use the send_email task to asynchronously message anyone able to publish layers when a new layer is submitted
---
 README               |   5 +-
 layerindex/tasks.py  |  31 +++++++++++
 layerindex/update.py | 143 +++++++++++++++++++++++++++------------------------
 layerindex/views.py  |   7 ++-
 requirements.txt     |   1 +
 settings.py          |   4 ++
 6 files changed, 120 insertions(+), 71 deletions(-)
 create mode 100644 layerindex/tasks.py

diff --git a/README b/README
index 62f739d..cfcc37f 100644
--- a/README
+++ b/README
@@ -14,6 +14,7 @@ In order to make use of this application you will need:
 * Python 3.4+
 * Django 1.8.x - tested with 1.8.17; newer versions may work, but
   the application has not been tested with 1.9 or newer.
+* RabbitMQ 3.6.x - tested with 3.6.10.
 * For production usage, a web server set up to host Django applications
   (not needed for local-only testing)
 * A database supported by Django (SQLite, MySQL, etc.). Django takes
@@ -41,7 +42,9 @@ Setup instructions:
 1. Edit settings.py to specify a database, EMAIL_HOST, SECRET_KEY and
    other settings specific to your installation. Ensure you set
    LAYER_FETCH_DIR to an absolute path to a location with sufficient
-   space for fetching layer repositories.
+   space for fetching layer repositories. Modify RABBIT_BROKER
+   and RABBIT_BACKEND to reflect the settings used by your RabbitMQ
+   server.
 
 2. Run the following commands within the layerindex-web directory to
    initialise the database:
diff --git a/layerindex/tasks.py b/layerindex/tasks.py
new file mode 100644
index 0000000..9bb4701
--- /dev/null
+++ b/layerindex/tasks.py
@@ -0,0 +1,31 @@
+from celery import Celery
+from django.core.mail import EmailMessage
+import os
+import time
+
+try:
+  from update import do_update
+except ImportError:
+  from .update import do_update
+
+try:
+  import settings
+except ImportError:
+  # not in a full django env, so settings is inaccessible.
+  # setup django to access settings.
+  from utils import setup_django
+  setup_django()
+  import settings
+
+tasks = Celery('layerindex',
+    broker=settings.RABBIT_BROKER,
+    backend=settings.RABBIT_BACKEND)
+
+ at tasks.task
+def update(**options):
+  return do_update(**options)
+
+ at tasks.task
+def send_email(subject, text_content, from_email=settings.DEFAULT_FROM_EMAIL, to_emails=[]):
+  msg = EmailMessage(subject, text_content, from_email, to_emails)
+  msg.send()
diff --git a/layerindex/update.py b/layerindex/update.py
index d5c56cd..f1c5039 100755
--- a/layerindex/update.py
+++ b/layerindex/update.py
@@ -17,7 +17,11 @@ import subprocess
 import signal
 from datetime import datetime, timedelta
 from distutils.version import LooseVersion
-import utils
+
+try:
+    import utils
+except ImportError:
+    from . import utils
 
 import warnings
 warnings.filterwarnings("ignore", category=DeprecationWarning)
@@ -71,73 +75,32 @@ def prepare_update_layer_command(options, branch, layer, updatedeps=False):
     cmd = '%s update_layer.py -l %s -b %s' % (cmdprefix, layer.name, branch.name)
     if updatedeps:
         cmd += ' --update-dependencies'
-    if options.reload:
+    if options.get('reload'):
         cmd += ' --reload'
-    if options.fullreload:
+    if options.get('fullreload'):
         cmd += ' --fullreload'
-    if options.nocheckout:
+    if options.get('nocheckout'):
         cmd += ' --nocheckout'
-    if options.dryrun:
+    if options.get('dryrun'):
         cmd += ' -n'
-    if options.loglevel == logging.DEBUG:
+    if options.get('loglevel') == logging.DEBUG:
         cmd += ' -d'
-    elif options.loglevel == logging.ERROR:
+    elif options.get('loglevel') == logging.ERROR:
         cmd += ' -q'
     return cmd
 
-
-def main():
-    if LooseVersion(git.__version__) < '0.3.1':
-        logger.error("Version of GitPython is too old, please install GitPython (python-git) 0.3.1 or later in order to use this script")
-        sys.exit(1)
-
-
-    parser = optparse.OptionParser(
-        usage = """
-    %prog [options]""")
-
-    parser.add_option("-b", "--branch",
-            help = "Specify branch(es) to update (use commas to separate multiple). Default is all enabled branches.",
-            action="store", dest="branch", default='')
-    parser.add_option("-l", "--layer",
-            help = "Specify layers to update (use commas to separate multiple). Default is all published layers.",
-            action="store", dest="layers")
-    parser.add_option("-r", "--reload",
-            help = "Reload recipe data instead of updating since last update",
-            action="store_true", dest="reload")
-    parser.add_option("", "--fullreload",
-            help = "Discard existing recipe data and fetch it from scratch",
-            action="store_true", dest="fullreload")
-    parser.add_option("-n", "--dry-run",
-            help = "Don't write any data back to the database",
-            action="store_true", dest="dryrun")
-    parser.add_option("-x", "--nofetch",
-            help = "Don't fetch repositories",
-            action="store_true", dest="nofetch")
-    parser.add_option("", "--nocheckout",
-            help = "Don't check out branches",
-            action="store_true", dest="nocheckout")
-    parser.add_option("-d", "--debug",
-            help = "Enable debug output",
-            action="store_const", const=logging.DEBUG, dest="loglevel", default=logging.INFO)
-    parser.add_option("-q", "--quiet",
-            help = "Hide all output except error messages",
-            action="store_const", const=logging.ERROR, dest="loglevel")
-
-    options, args = parser.parse_args(sys.argv)
-    if len(args) > 1:
-        logger.error('unexpected argument "%s"' % args[1])
-        parser.print_help()
-        sys.exit(1)
-
+def do_update(**options):
+    """
+    Download layers and branches based on given metadata.
+    """
     utils.setup_django()
     import settings
     from layerindex.models import Branch, LayerItem, Update, LayerUpdate
 
-    logger.setLevel(options.loglevel)
+    logger.setLevel(options.get('loglevel', logging.INFO))
 
-    if options.branch:
-        branches = options.branch.split(',')
+    if options.get('branch'):
+        branches = options['branch'].split(',')
         for branch in branches:
             if not utils.get_branch(branch):
                 logger.error("Specified branch %s is not valid" % branch)
@@ -151,10 +114,10 @@ def main():
         logger.error("Please set LAYER_FETCH_DIR in settings.py")
         sys.exit(1)
 
-    if options.layers:
-        layerquery = LayerItem.objects.filter(classic=False).filter(name__in=options.layers.split(','))
+    if options.get('layers'):
+        layerquery = LayerItem.objects.filter(classic=False).filter(name__in=options['layers'].split(','))
         if layerquery.count() == 0:
-            logger.error('No layers matching specified query "%s"' % options.layers)
+            logger.error('No layers matching specified query "%s"' % options['layers'])
             sys.exit(1)
     else:
         # We deliberately exclude status == 'X' ("no update") here
@@ -174,11 +137,11 @@ def main():
 
     update = Update()
     update.started = datetime.now()
-    if options.fullreload or options.reload:
+    if options.get('fullreload') or options.get('reload'):
         update.reload = True
     else:
         update.reload = False
-    if not options.dryrun:
+    if not options.get('dryrun'):
         update.save()
     try:
         lockfn = os.path.join(fetchdir, "layerindex.lock")
@@ -189,7 +152,7 @@ def main():
         try:
             bitbakepath = os.path.join(fetchdir, 'bitbake')
 
-            if not options.nofetch:
+            if not options.get('nofetch'):
                 # Fetch latest metadata from repositories
                 for layer in layerquery:
                     # Handle multiple layers in a single repo
@@ -239,7 +202,7 @@ def main():
                             layerupdate.started = datetime.now()
                             layerupdate.finished = datetime.now()
                             layerupdate.log = 'ERROR: fetch failed: %s' % errmsg
-                            if not options.dryrun:
+                            if not options.get('dryrun'):
                                 layerupdate.save()
                         continue
 
@@ -260,7 +223,7 @@ def main():
                         last_rev[layerbranch] = layerbranch.vcs_last_rev
                         layerupdate.layerbranch = layerbranch
                         layerupdate.log = output
-                        if not options.dryrun:
+                        if not options.get('dryrun'):
                             layerupdate.save()
 
                     if ret == 254:
@@ -277,7 +240,7 @@ def main():
                 for layer in layerquery:
                     layerbranch = layer.get_layerbranch(branch)
                     if layerbranch:
-                        if not (options.reload or options.fullreload):
+                        if not (options.get('reload') or options.get('fullreload')):
                             # Skip layers that did not change.
                             layer_last_rev = last_rev.get(layerbranch, None)
                             if layer_last_rev is None or layer_last_rev == layerbranch.vcs_last_rev:
@@ -297,10 +260,10 @@ def main():
     finally:
         update.log = ''.join(listhandler.read())
         update.finished = datetime.now()
-        if not options.dryrun:
+        if not options.get('dryrun'):
             update.save()
 
-    if not options.dryrun:
+    if not options.get('dryrun'):
         # Purge old update records
         update_purge_days = getattr(settings, 'UPDATE_PURGE_DAYS', 30)
         Update.objects.filter(started__lte=datetime.now()-timedelta(days=update_purge_days)).delete()
@@ -308,5 +271,53 @@ def main():
     sys.exit(0)
 
 
+def main():
+    if LooseVersion(git.__version__) < '0.3.1':
+        logger.error("Version of GitPython is too old, please install GitPython (python-git) 0.3.1 or later in order to use this script")
+        sys.exit(1)
+
+
+    parser = optparse.OptionParser(
+        usage = """
+    %prog [options]""")
+
+    parser.add_option("-b", "--branch",
+            help = "Specify branch(es) to update (use commas to separate multiple). Default is all enabled branches.",
+            action="store", dest="branch", default='')
+    parser.add_option("-l", "--layer",
+            help = "Specify layers to update (use commas to separate multiple). Default is all published layers.",
+            action="store", dest="layers")
+    parser.add_option("-r", "--reload",
+            help = "Reload recipe data instead of updating since last update",
+            action="store_true", dest="reload")
+    parser.add_option("", "--fullreload",
+            help = "Discard existing recipe data and fetch it from scratch",
+            action="store_true", dest="fullreload")
+    parser.add_option("-n", "--dry-run",
+            help = "Don't write any data back to the database",
+            action="store_true", dest="dryrun")
+    parser.add_option("-x", "--nofetch",
+            help = "Don't fetch repositories",
+            action="store_true", dest="nofetch")
+    parser.add_option("", "--nocheckout",
+            help = "Don't check out branches",
+            action="store_true", dest="nocheckout")
+    parser.add_option("-d", "--debug",
+            help = "Enable debug output",
+            action="store_const", const=logging.DEBUG, dest="loglevel", default=logging.INFO)
+    parser.add_option("-q", "--quiet",
+            help = "Hide all output except error messages",
+            action="store_const", const=logging.ERROR, dest="loglevel")
+
+    options, args = parser.parse_args(sys.argv)
+    if len(args) > 1:
+        logger.error('unexpected argument "%s"' % args[1])
+        parser.print_help()
+        sys.exit(1)
+
+    options_dict = vars(options)
+    do_update(**options_dict)
+
+
 if __name__ == "__main__":
     main()
diff --git a/layerindex/views.py b/layerindex/views.py
index 65a536a..c7c20b8 100644
--- a/layerindex/views.py
+++ b/layerindex/views.py
@@ -19,7 +19,6 @@ from layerindex.forms import EditLayerForm, LayerMaintainerFormSet, EditNoteForm
 from django.db import transaction
 from django.contrib.auth.models import User, Permission
 from django.db.models import Q, Count, Sum
-from django.core.mail import EmailMessage
 from django.template.loader import get_template
 from django.template import Context
 from django.utils.decorators import method_decorator
@@ -28,6 +27,7 @@ from django.contrib import messages
 from reversion.models import Revision
 from . import utils
 from . import simplesearch
+from . import tasks
 import settings
 from django.dispatch import receiver
 import reversion
@@ -163,7 +163,7 @@ def edit_layer_view(request, template_name, branch='master', slug=None):
                     # Send email
                     plaintext = get_template('layerindex/submitemail.txt')
                     perm = Permission.objects.get(codename='publish_layer')
-                    users = User.objects.filter(Q(groups__permissions=perm) | Q(user_permissions=perm) ).distinct()
+                    users = User.objects.filter(Q(groups__permissions=perm) | Q(user_permissions=perm) | Q(is_superuser=True) ).distinct()
                     for user in users:
                         if user.first_name:
                             user_name = user.first_name
@@ -178,8 +178,7 @@ def edit_layer_view(request, template_name, branch='master', slug=None):
                         from_email = settings.SUBMIT_EMAIL_FROM
                         to_email = user.email
                         text_content = plaintext.render(d)
-                        msg = EmailMessage(subject, text_content, from_email, [to_email])
-                        msg.send()
+                        tasks.send_email(subject, text_content, from_email, [to_email])
                     return HttpResponseRedirect(reverse('submit_layer_thanks'))
             messages.success(request, 'Layer %s saved successfully.' % layeritem.name)
             if return_url:
diff --git a/requirements.txt b/requirements.txt
index f80e5d4..7cd741b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+celery==3.1.25
 Django==1.8.17
 django-cors-headers==1.1.0
 django-nvd3==0.9.7
diff --git a/settings.py b/settings.py
index 0ecf90b..94d698b 100644
--- a/settings.py
+++ b/settings.py
@@ -217,3 +217,7 @@ UPDATE_PURGE_DAYS = 30
 # Settings for layer submission feature
 SUBMIT_EMAIL_FROM = 'noreply at example.com'
 SUBMIT_EMAIL_SUBJECT = 'OE Layerindex layer submission'
+
+# RabbitMQ settings
+RABBIT_BROKER = 'amqp://'
+RABBIT_BACKEND = 'rpc://'
-- 
2.7.4




More information about the yocto mailing list