_celery_tasks.py 1.3 KB
Newer Older
1
from __future__ import absolute_import, unicode_literals
W
wangyong 已提交
2 3 4

from celery import shared_task

A
Administrator 已提交
5
from common import celery_app
baltery's avatar
baltery 已提交
6
from ops.utils.ansible_api import Options, ADHocRunner
W
wangyong 已提交
7

8 9

@shared_task(name="get_asset_hardware_info")
A
Administrator 已提交
10
def get_asset_hardware_info(task_name, task_uuid, *assets):
baltery's avatar
baltery 已提交
11
    conf = Options()
12 13 14 15 16 17 18 19 20
    play_source = {
            "name": "Get host hardware information",
            "hosts": "default",
            "gather_facts": "no",
            "tasks": [
                dict(action=dict(module='setup'))
            ]
        }
    hoc = ADHocRunner(conf, play_source, *assets)
A
Administrator 已提交
21
    ext_code, result = hoc.run(task_name, task_uuid)
22 23 24 25
    return ext_code, result


@shared_task(name="asset_test_ping_check")
A
Administrator 已提交
26
def asset_test_ping_check(task_name, task_uuid, *assets):
baltery's avatar
baltery 已提交
27
    conf = Options()
28 29 30 31 32 33 34 35 36
    play_source = {
            "name": "Test host connection use ping",
            "hosts": "default",
            "gather_facts": "no",
            "tasks": [
                dict(action=dict(module='ping'))
            ]
        }
    hoc = ADHocRunner(conf, play_source, *assets)
A
Administrator 已提交
37
    ext_code, result = hoc.run(task_name, task_uuid)
38
    return ext_code, result
39 40 41 42 43


@shared_task(name="add_user_to_assert")
def add_user_to_asset():
    pass
W
wangyong 已提交
44 45 46 47 48


@celery_app.task(name='hello-world')
def hello():
    print('hello world!')