From 4893c4664d7c8057874d428c09d5545582647624 Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 22 Dec 2017 02:08:29 +0800 Subject: [PATCH] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9task=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E8=BF=90=E8=A1=8C=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/__init__.py | 4 +- apps/assets/tasks.py | 16 ++++--- apps/common/mixins.py | 1 - apps/common/utils.py | 1 + apps/jumpserver/settings.py | 6 +-- apps/ops/ansible/__init__.py | 2 +- apps/ops/ansible/exceptions.py | 4 ++ apps/ops/models.py | 78 +++++++++++++++++++++++++++++++--- apps/ops/tasks.py | 8 ++-- apps/ops/utils.py | 16 +++++++ config_example.py | 25 ++++++++--- requirements/requirements.txt | 2 + run_server.py | 66 +++++++++++++++++----------- 13 files changed, 175 insertions(+), 54 deletions(-) diff --git a/apps/__init__.py b/apps/__init__.py index f93d0bec7..5c66b357e 100644 --- a/apps/__init__.py +++ b/apps/__init__.py @@ -2,6 +2,4 @@ # -*- coding: utf-8 -*- # - -if __name__ == '__main__': - pass +__version__ = "0.5.0" diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index a46e697fc..494162eb4 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -8,10 +8,8 @@ from django.db.models.signals import post_save from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger -from common.celery import app as celery_app from .models import SystemUser, AdminUser, Asset from . import const -from .signals import on_app_ready FORKS = 10 @@ -402,22 +400,28 @@ def push_system_user_on_auth_change(sender, instance=None, update_fields=None, * push_system_user_to_cluster_assets.delay(instance, task_name) -celery_app.conf['CELERYBEAT_SCHEDULE'].update( +periodic_tasks = ( { 'update_assets_hardware_period': { 'task': 'assets.tasks.update_assets_hardware_period', - 'schedule': 60*60*24, + 'schedule': 60*60*60*24, 'args': (), }, 'test-admin-user-connectability_period': { 'task': 'assets.tasks.test_admin_user_connectability_period', - 'schedule': 60*60, + 'schedule': 60*60*60, 'args': (), }, 'push_system_user_period': { 'task': 'assets.tasks.push_system_user_period', - 'schedule': 60*60, + 'schedule': 60*60*60*24, 'args': (), } } ) + + +def initial_periodic_tasks(): + from ops.utils import create_periodic_tasks + create_periodic_tasks(periodic_tasks) + diff --git a/apps/common/mixins.py b/apps/common/mixins.py index 764d8e50a..8f6076e4a 100644 --- a/apps/common/mixins.py +++ b/apps/common/mixins.py @@ -7,7 +7,6 @@ from django.utils.timezone import now from django.utils.translation import ugettext_lazy as _ - class NoDeleteQuerySet(models.query.QuerySet): def delete(self): diff --git a/apps/common/utils.py b/apps/common/utils.py index e7b07860b..f1edce12e 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # +import json import re from collections import OrderedDict from six import string_types diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index 040a92cc4..a6da1124a 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -27,9 +27,7 @@ sys.path.append(PROJECT_DIR) # Import project config setting try: - from config import config as env_config, env - - CONFIG = env_config.get(env, 'default')() + from config import config as CONFIG except ImportError: CONFIG = type('_', (), {'__getattr__': lambda arg1, arg2: None})() @@ -66,12 +64,12 @@ INSTALLED_APPS = [ 'django_filters', 'bootstrap3', 'captcha', + 'django_celery_beat', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', - ] MIDDLEWARE = [ diff --git a/apps/ops/ansible/__init__.py b/apps/ops/ansible/__init__.py index d59972354..a175387eb 100644 --- a/apps/ops/ansible/__init__.py +++ b/apps/ops/ansible/__init__.py @@ -3,4 +3,4 @@ from .callback import * from .inventory import * from .runner import * - +from .exceptions import * diff --git a/apps/ops/ansible/exceptions.py b/apps/ops/ansible/exceptions.py index e49afd34a..12061bf9d 100644 --- a/apps/ops/ansible/exceptions.py +++ b/apps/ops/ansible/exceptions.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- # +__all__ = [ + 'AnsibleError' +] + class AnsibleError(Exception): pass diff --git a/apps/ops/models.py b/apps/ops/models.py index e6b346334..b9ff7b460 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -4,10 +4,15 @@ import logging import json import uuid +import time from django.db import models +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ +from django.core import serializers +from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask from common.utils import signer +from .ansible import AdHocRunner, AnsibleError __all__ = ["Task", "AdHoc", "AdHocRunHistory"] @@ -22,7 +27,17 @@ class Task(models.Model): """ id = models.UUIDField(default=uuid.uuid4, primary_key=True) name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) + interval = models.ForeignKey( + IntervalSchedule, on_delete=models.CASCADE, + null=True, blank=True, verbose_name=_('Interval'), + ) + crontab = models.ForeignKey( + CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True, + verbose_name=_('Crontab'), help_text=_('Use one of Interval/Crontab'), + ) + is_periodic = models.BooleanField(default=False) is_deleted = models.BooleanField(default=False) + comment = models.TextField(blank=True, verbose_name=_("Comment")) created_by = models.CharField(max_length=128, blank=True, null=True, default='') date_created = models.DateTimeField(auto_now_add=True) __latest_adhoc = None @@ -65,12 +80,32 @@ class Task(models.Model): def get_run_history(self): return self.history.all() - def run(self): + def run(self, record=True): if self.latest_adhoc: - return self.latest_adhoc.run() + return self.latest_adhoc.run(record=record) else: return {'error': 'No adhoc'} + def save(self, force_insert=False, force_update=False, using=None, + update_fields=None): + instance = super().save( + force_insert=force_insert, force_update=force_update, + using=using, update_fields=update_fields, + ) + + if instance.is_periodic: + PeriodicTask.objects.update_or_create( + interval=instance.interval, + crontab=instance.crontab, + name=self.name, + task='ops.run_task', + args=serializers.serialize('json', [instance]), + ) + else: + PeriodicTask.objects.filter(name=self.name).delete() + + return instance + def __str__(self): return self.name @@ -128,9 +163,42 @@ class AdHoc(models.Model): else: return {} - def run(self): - from .utils import run_adhoc_object - return run_adhoc_object(self, **self.options) + def run(self, record=True): + if record: + return self._run_and_record() + else: + return self._run_only() + + def _run_and_record(self): + history = AdHocRunHistory(adhoc=self, task=self.task) + time_start = time.time() + try: + result = self._run_only() + history.is_finished = True + if result.results_summary.get('dark'): + history.is_success = False + else: + history.is_success = True + history.result = result.results_raw + history.summary = result.results_summary + return result + finally: + history.date_finished = timezone.now() + history.timedelta = time.time() - time_start + history.save() + + def _run_only(self): + from .utils import get_adhoc_inventory + inventory = get_adhoc_inventory(self) + runner = AdHocRunner(inventory) + for k, v in self.options.items(): + runner.set_option(k, v) + + try: + result = runner.run(self.tasks, self.pattern, self.task.name) + return result + except AnsibleError as e: + logger.error("Failed run adhoc {}, {}".format(self.task.name, e)) @become.setter def become(self, item): diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index c5298377d..b2647c465 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -1,7 +1,6 @@ # coding: utf-8 from celery import shared_task - -from .utils import run_adhoc +from django.core import serializers def rerun_task(): @@ -9,5 +8,6 @@ def rerun_task(): @shared_task -def run_add_hoc_and_record_async(adhoc, **options): - return run_adhoc(adhoc, **options) +def run_task(tasks_json): + for task in serializers.deserialize('json', tasks_json): + task.object.run() diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 3c09d598f..f015f8392 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -3,6 +3,7 @@ import time from django.utils import timezone from django.db import transaction +from django_celery_beat.models import PeriodicTask, IntervalSchedule from common.utils import get_logger, get_object_or_none, get_short_uuid_str from .ansible import AdHocRunner, CommandResultCallback @@ -131,4 +132,19 @@ def create_or_update_task( return task +def create_periodic_tasks(tasks): + for name, detail in tasks.items(): + schedule, _ = IntervalSchedule.objects.get_or_create( + every=detail['schedule'], + period=IntervalSchedule.SECONDS, + ) + + task = PeriodicTask.objects.create( + interval=schedule, + name=name, + task=detail['task'], + args=json.dumps(detail.get('args', [])), + kwargs=json.dumps(detail.get('kwargs', {})), + ) + print("Create periodic task: {}".format(task)) diff --git a/config_example.py b/config_example.py index 95c6414c6..10cf063e4 100644 --- a/config_example.py +++ b/config_example.py @@ -4,7 +4,7 @@ Jumpserver project setting file - :copyright: (c) 2014-2016 by Jumpserver Team. + :copyright: (c) 2014-2017 by Jumpserver Team :license: GPL v2, see LICENSE for more details. """ import os @@ -50,6 +50,11 @@ class Config: # DB_PASSWORD = '' # DB_NAME = 'jumpserver' + # When Django start it will bind this host and port + # ./manage.py runserver 127.0.0.1:8080 + HTTP_BIND_HOST = '0.0.0.0' + HTTP_LISTEN_PORT = 8080 + # Use Redis as broker for celery and web socket REDIS_HOST = '127.0.0.1' REDIS_PORT = 6379 @@ -101,8 +106,18 @@ class Config: return None -config = { - 'default': Config, -} +class DevelopmentConfig(Config): + pass + + +class TestConfig(Config): + pass + + +class ProductionConfig(Config): + pass + + +# Default using Config settings, you can write if/else for different env +config = Config() -env = 'default' diff --git a/requirements/requirements.txt b/requirements/requirements.txt index b25089107..552fd1802 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -57,3 +57,5 @@ uritemplate==3.0.0 urllib3==1.22 vine==1.1.4 gunicorn==19.7.1 +django_celery_beat==1.1.0 +ephem==3.7.6.0 diff --git a/run_server.py b/run_server.py index 1e9bed6a3..72905c0d6 100644 --- a/run_server.py +++ b/run_server.py @@ -1,48 +1,64 @@ #!/usr/bin/env python -# ~*~ coding: utf-8 ~*~ -from threading import Thread import os import subprocess +import time +from threading import Thread -try: - from config import config as env_config, env +from apps import __version__ - CONFIG = env_config.get(env, 'default')() +try: + from config import config as CONFIG except ImportError: CONFIG = type('_', (), {'__getattr__': None})() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +APPS_DIR = os.path.join(BASE_DIR, 'apps') +HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' +HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 +LOG_LEVEL = CONFIG.LOG_LEVEL +WORKERS = 4 -apps_dir = os.path.join(BASE_DIR, 'apps') - -def start_django(): - http_host = CONFIG.HTTP_BIND_HOST or '127.0.0.1' - http_port = CONFIG.HTTP_LISTEN_PORT or '8080' - os.chdir(apps_dir) - print('start django') - subprocess.call('python ./manage.py runserver %s:%s' % (http_host, http_port), shell=True) +def start_gunicorn(): + print("- Start Gunicorn WSGI HTTP Server") + os.chdir(APPS_DIR) + cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS) + subprocess.call(cmd, shell=True) def start_celery(): - os.chdir(apps_dir) - os.environ.setdefault('C_FORCE_ROOT', '1') - os.environ.setdefault('PYTHONOPTIMIZE', '1') - print('start celery') - subprocess.call('celery -A common worker -B -s /tmp/celerybeat-schedule -l debug', shell=True) + print("- Start Celery as Distributed Task Queue") + os.chdir(APPS_DIR) + # os.environ.setdefault('PYTHONOPTIMIZE', '1') + cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower()) + subprocess.call(cmd, shell=True) -def main(): - t1 = Thread(target=start_django, args=()) - t2 = Thread(target=start_celery, args=()) +def start_beat(): + print("- Start Beat as Periodic Task Scheduler") + os.chdir(APPS_DIR) + # os.environ.setdefault('PYTHONOPTIMIZE', '1') + schduler = "django_celery_beat.schedulers:DatabaseScheduler" + cmd = 'celery -A common beat -l {} --scheduler {}'.format(LOG_LEVEL, schduler) + subprocess.call(cmd, shell=True) - t1.start() - t2.start() - t1.join() - t2.join() +def main(): + print(time.ctime()) + print('Jumpserver version {}, more see https://www.jumpserver.org'.format( + __version__)) + print('Quit the server with CONTROL-C.') + + threads = [] + for func in (start_gunicorn, start_celery, start_beat): + t = Thread(target=func, args=()) + threads.append(t) + t.start() + + for t in threads: + t.join() if __name__ == '__main__': -- GitLab