[yocto] [layerindex-web][PATCH] WIP solution to bug-11197: adding asynchronous task execution with celery and rabbitmq

Diana Thayer garbados at gmail.com
Thu Jun 15 19:35:12 PDT 2017


- updated readme to include RabbitMQ as a dependency
- updated settings.py to include RABBIT_BROKER and RABBIT_BACKEND
- added celery to requirements.txt
- defined celery tasks in layerindex/tasks.py
- separated layerindex/update#main into two functions: main (which parses sys.argv) and do_update (which accepts options as keyword arguments)
- updated layerindex/views.py to use the send_email task to asynchronously message anyone able to publish layers when a new layer is submitted
---
 README               |   5 +-
 layerindex/tasks.py  |  31 +++++++++++
 layerindex/update.py | 143 +++++++++++++++++++++++++++------------------------
 layerindex/views.py  |   7 ++-
 requirements.txt     |   1 +
 settings.py          |   4 ++
 6 files changed, 120 insertions(+), 71 deletions(-)
 create mode 100644 layerindex/tasks.py

diff --git a/README b/README
index 62f739d..cfcc37f 100644
--- a/README
+++ b/README
@@ -14,6 +14,7 @@ In order to make use of this application you will need:
 * Python 3.4+
 * Django 1.8.x - tested with 1.8.17; newer versions may work, but
   the application has not been tested with 1.9 or newer.
+* RabbitMQ 3.6.x - tested with 3.6.10.
 * For production usage, a web server set up to host Django applications
   (not needed for local-only testing)
 * A database supported by Django (SQLite, MySQL, etc.). Django takes
@@ -41,7 +42,9 @@ Setup instructions:
 1. Edit settings.py to specify a database, EMAIL_HOST, SECRET_KEY and
    other settings specific to your installation. Ensure you set
    LAYER_FETCH_DIR to an absolute path to a location with sufficient
-   space for fetching layer repositories.
+   space for fetching layer repositories. Modify RABBIT_BROKER
+   and RABBIT_BACKEND to reflect the settings used by your RabbitMQ
+   server.
 
 2. Run the following commands within the layerindex-web directory to
    initialise the database:
diff --git a/layerindex/tasks.py b/layerindex/tasks.py
new file mode 100644
index 0000000..9bb4701
--- /dev/null
+++ b/layerindex/tasks.py
@@ -0,0 +1,31 @@
+from celery import Celery
+from django.core.mail import EmailMessage
+import os
+import time
+
+try:
+  from update import do_update
+except ImportError:
+  from .update import do_update
+
+try:
+  import settings
+except ImportError:
+  # not in a full django env, so settings is inaccessible.
+  # setup django to access settings.
+  from utils import setup_django
+  setup_django()
+  import settings
+
+tasks = Celery('layerindex',
+    broker=settings.RABBIT_BROKER,
+    backend=settings.RABBIT_BACKEND)
+
+ at tasks.task
+def update(**options):
+  return do_update(**options)
+
+ at tasks.task
+def send_email(subject, text_content, from_email=settings.DEFAULT_FROM_EMAIL, to_emails=[]):
+  msg = EmailMessage(subject, text_content, from_email, to_emails)
+  msg.send()
diff --git a/layerindex/update.py b/layerindex/update.py
index d5c56cd..f1c5039 100755
--- a/layerindex/update.py
+++ b/layerindex/update.py
@@ -17,7 +17,11 @@ import subprocess
 import signal
 from datetime import datetime, timedelta
 from distutils.version import LooseVersion
-import utils
+
+try:
+    import utils
+except ImportError:
+    from . import utils
 
 import warnings
 warnings.filterwarnings("ignore", category=DeprecationWarning)
@@ -71,73 +75,32 @@ def prepare_update_layer_command(options, branch, layer, updatedeps=False):
     cmd = '%s update_layer.py -l %s -b %s' % (cmdprefix, layer.name, branch.name)
     if updatedeps:
         cmd += ' --update-dependencies'
-    if options.reload:
+    if options.get('reload'):
         cmd += ' --reload'
-    if options.fullreload:
+    if options.get('fullreload'):
         cmd += ' --fullreload'
-    if options.nocheckout:
+    if options.get('nocheckout'):
         cmd += ' --nocheckout'
-    if options.dryrun:
+    if options.get('dryrun'):
         cmd += ' -n'
-    if options.loglevel == logging.DEBUG:
+    if options.get('loglevel') == logging.DEBUG:
         cmd += ' -d'
-    elif options.loglevel == logging.ERROR:
+    elif options.get('loglevel') == logging.ERROR:
         cmd += ' -q'
     return cmd
 
-
-def main():
-    if LooseVersion(git.__version__) < '0.3.1':
-        logger.error("Version of GitPython is too old, please install GitPython (python-git) 0.3.1 or later in order to use this script")
-        sys.exit(1)
-
-
-    parser = optparse.OptionParser(
-        usage = """
-    %prog [options]""")
-
-    parser.add_option("-b", "--branch",
-            help = "Specify branch(es) to update (use commas to separate multiple). Default is all enabled branches.",
-            action="store", dest="branch", default='')
-    parser.add_option("-l", "--layer",
-            help = "Specify layers to update (use commas to separate multiple). Default is all published layers.",
-            action="store", dest="layers")
-    parser.add_option("-r", "--reload",
-            help = "Reload recipe data instead of updating since last update",
-            action="store_true", dest="reload")
-    parser.add_option("", "--fullreload",
-            help = "Discard existing recipe data and fetch it from scratch",
-            action="store_true", dest="fullreload")
-    parser.add_option("-n", "--dry-run",
-            help = "Don't write any data back to the database",
-            action="store_true", dest="dryrun")
-    parser.add_option("-x", "--nofetch",
-            help = "Don't fetch repositories",
-            action="store_true", dest="nofetch")
-    parser.add_option("", "--nocheckout",
-            help = "Don't check out branches",
-            action="store_true", dest="nocheckout")
-    parser.add_option("-d", "--debug",
-            help = "Enable debug output",
-            action="store_const", const=logging.DEBUG, dest="loglevel", default=logging.INFO)
-    parser.add_option("-q", "--quiet",
-            help = "Hide all output except error messages",
-            action="store_const", const=logging.ERROR, dest="loglevel")
-
-    options, args = parser.parse_args(sys.argv)
-    if len(args) > 1:
-        logger.error('unexpected argument "%s"' % args[1])
-        parser.print_help()
-        sys.exit(1)
-
+def do_update(**options):
+    """
+    Download layers and branches based on given metadata.
+    """
     utils.setup_django()
     import settings
     from layerindex.models import Branch, LayerItem, Update, LayerUpdate
 
-    logger.setLevel(options.loglevel)
+    logger.setLevel(options.get('loglevel', logging.INFO))
 
-    if options.branch:
-        branches = options.branch.split(',')
+    if options.get('branch'):
+        branches = options['branch'].split(',')
         for branch in branches:
             if not utils.get_branch(branch):
                 logger.error("Specified branch %s is not valid" % branch)
@@ -151,10 +114,10 @@ def main():
         logger.error("Please set LAYER_FETCH_DIR in settings.py")
         sys.exit(1)
 
-    if options.layers:
-        layerquery = LayerItem.objects.filter(classic=False).filter(name__in=options.layers.split(','))
+    if options.get('layers'):
+        layerquery = LayerItem.objects.filter(classic=False).filter(name__in=options['layers'].split(','))
         if layerquery.count() == 0:
-            logger.error('No layers matching specified query "%s"' % options.layers)
+            logger.error('No layers matching specified query "%s"' % options['layers'])
             sys.exit(1)
     else:
         # We deliberately exclude status == 'X' ("no update") here
@@ -174,11 +137,11 @@ def main():
 
     update = Update()
     update.started = datetime.now()
-    if options.fullreload or options.reload:
+    if options.get('fullreload') or options.get('reload'):
         update.reload = True
     else:
         update.reload = False
-    if not options.dryrun:
+    if not options.get('dryrun'):
         update.save()
     try:
         lockfn = os.path.join(fetchdir, "layerindex.lock")
@@ -189,7 +152,7 @@ def main():
         try:
             bitbakepath = os.path.join(fetchdir, 'bitbake')
 
-            if not options.nofetch:
+            if not options.get('nofetch'):
                 # Fetch latest metadata from repositories
                 for layer in layerquery:
                     # Handle multiple layers in a single repo
@@ -239,7 +202,7 @@ def main():
                             layerupdate.started = datetime.now()
                             layerupdate.finished = datetime.now()
                             layerupdate.log = 'ERROR: fetch failed: %s' % errmsg
-                            if not options.dryrun:
+                            if not options.get('dryrun'):
                                 layerupdate.save()
                         continue
 
@@ -260,7 +223,7 @@ def main():
                         last_rev[layerbranch] = layerbranch.vcs_last_rev
                         layerupdate.layerbranch = layerbranch
                         layerupdate.log = output
-                        if not options.dryrun:
+                        if not options.get('dryrun'):
                             layerupdate.save()
 
                     if ret == 254:
@@ -277,7 +240,7 @@ def main():
                 for layer in layerquery:
                     layerbranch = layer.get_layerbranch(branch)
                     if layerbranch:
-                        if not (options.reload or options.fullreload):
+                        if not (options.get('reload') or options.get('fullreload')):
                             # Skip layers that did not change.
                             layer_last_rev = last_rev.get(layerbranch, None)
                             if layer_last_rev is None or layer_last_rev == layerbranch.vcs_last_rev:
@@ -297,10 +260,10 @@ def main():
     finally:
         update.log = ''.join(listhandler.read())
         update.finished = datetime.now()
-        if not options.dryrun:
+        if not options.get('dryrun'):
             update.save()
 
-    if not options.dryrun:
+    if not options.get('dryrun'):
         # Purge old update records
         update_purge_days = getattr(settings, 'UPDATE_PURGE_DAYS', 30)
         Update.objects.filter(started__lte=datetime.now()-timedelta(days=update_purge_days)).delete()
@@ -308,5 +271,53 @@ def main():
     sys.exit(0)
 
 
+def main():
+    if LooseVersion(git.__version__) < '0.3.1':
+        logger.error("Version of GitPython is too old, please install GitPython (python-git) 0.3.1 or later in order to use this script")
+        sys.exit(1)
+
+
+    parser = optparse.OptionParser(
+        usage = """
+    %prog [options]""")
+
+    parser.add_option("-b", "--branch",
+            help = "Specify branch(es) to update (use commas to separate multiple). Default is all enabled branches.",
+            action="store", dest="branch", default='')
+    parser.add_option("-l", "--layer",
+            help = "Specify layers to update (use commas to separate multiple). Default is all published layers.",
+            action="store", dest="layers")
+    parser.add_option("-r", "--reload",
+            help = "Reload recipe data instead of updating since last update",
+            action="store_true", dest="reload")
+    parser.add_option("", "--fullreload",
+            help = "Discard existing recipe data and fetch it from scratch",
+            action="store_true", dest="fullreload")
+    parser.add_option("-n", "--dry-run",
+            help = "Don't write any data back to the database",
+            action="store_true", dest="dryrun")
+    parser.add_option("-x", "--nofetch",
+            help = "Don't fetch repositories",
+            action="store_true", dest="nofetch")
+    parser.add_option("", "--nocheckout",
+            help = "Don't check out branches",
+            action="store_true", dest="nocheckout")
+    parser.add_option("-d", "--debug",
+            help = "Enable debug output",
+            action="store_const", const=logging.DEBUG, dest="loglevel", default=logging.INFO)
+    parser.add_option("-q", "--quiet",
+            help = "Hide all output except error messages",
+            action="store_const", const=logging.ERROR, dest="loglevel")
+
+    options, args = parser.parse_args(sys.argv)
+    if len(args) > 1:
+        logger.error('unexpected argument "%s"' % args[1])
+        parser.print_help()
+        sys.exit(1)
+
+    options_dict = vars(options)
+    do_update(**options_dict)
+
+
 if __name__ == "__main__":
     main()
diff --git a/layerindex/views.py b/layerindex/views.py
index 65a536a..c7c20b8 100644
--- a/layerindex/views.py
+++ b/layerindex/views.py
@@ -19,7 +19,6 @@ from layerindex.forms import EditLayerForm, LayerMaintainerFormSet, EditNoteForm
 from django.db import transaction
 from django.contrib.auth.models import User, Permission
 from django.db.models import Q, Count, Sum
-from django.core.mail import EmailMessage
 from django.template.loader import get_template
 from django.template import Context
 from django.utils.decorators import method_decorator
@@ -28,6 +27,7 @@ from django.contrib import messages
 from reversion.models import Revision
 from . import utils
 from . import simplesearch
+from . import tasks
 import settings
 from django.dispatch import receiver
 import reversion
@@ -163,7 +163,7 @@ def edit_layer_view(request, template_name, branch='master', slug=None):
                     # Send email
                     plaintext = get_template('layerindex/submitemail.txt')
                     perm = Permission.objects.get(codename='publish_layer')
-                    users = User.objects.filter(Q(groups__permissions=perm) | Q(user_permissions=perm) ).distinct()
+                    users = User.objects.filter(Q(groups__permissions=perm) | Q(user_permissions=perm) | Q(is_superuser=True) ).distinct()
                     for user in users:
                         if user.first_name:
                             user_name = user.first_name
@@ -178,8 +178,7 @@ def edit_layer_view(request, template_name, branch='master', slug=None):
                         from_email = settings.SUBMIT_EMAIL_FROM
                         to_email = user.email
                         text_content = plaintext.render(d)
-                        msg = EmailMessage(subject, text_content, from_email, [to_email])
-                        msg.send()
+                        tasks.send_email(subject, text_content, from_email, [to_email])
                     return HttpResponseRedirect(reverse('submit_layer_thanks'))
             messages.success(request, 'Layer %s saved successfully.' % layeritem.name)
             if return_url:
diff --git a/requirements.txt b/requirements.txt
index f80e5d4..7cd741b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+celery==3.1.25
 Django==1.8.17
 django-cors-headers==1.1.0
 django-nvd3==0.9.7
diff --git a/settings.py b/settings.py
index 0ecf90b..e98236b 100644
--- a/settings.py
+++ b/settings.py
@@ -217,3 +217,7 @@ UPDATE_PURGE_DAYS = 30
 # Settings for layer submission feature
 SUBMIT_EMAIL_FROM = 'noreply at example.com'
 SUBMIT_EMAIL_SUBJECT = 'OE Layerindex layer submission'
+
+# RabbitMQ settings
+RABBIT_BROKER = 'amqp://admin:password@localhost:5672/admin_vhost'
+RABBIT_BACKEND = 'rpc://'
-- 
2.7.4




More information about the yocto mailing list