栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

ansible (2.4.2.0) API python调用重写

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ansible (2.4.2.0)  API python调用重写

转自 jumpserver 5.0 测试版本  下载地址

https://github.com/hequan2017/zabbix-models/tree/master/ansible_run

只是简单改了一下 能够单独使用。

目录结构

ansible_run/├── callback.py├── exceptions.py├── __init__.py├── inventory.py├── runner.py├── test_inventory.py└── test_runner.py

下面两个是演示文件

先  pip3 install  ansible==2.4.2.0   安装

例子


# -*- coding: utf-8 -*-

from  runner import AdHocRunner, CommandRunner

from  inventory import baseInventory

def  TestAdHocRunner():

        """

         以yml的形式 执行多个命令

        :return:

        """

        host_data = [

            {

                "hostname": "testserver",

                "ip": "192.168.10.93",

                "port": 22,

                "username": "root",

                "password": "password",

            },

        ]

        inventory = baseInventory(host_data)

        runner = AdHocRunner(inventory)

        tasks = [

            {"action": {"module": "cron","args": "name="sync time" minute="*/3" job="/usr/sbin/ntpdate time.nist.gov &> /dev/null"" }, "name": "run_cmd"},

            {"action": {"module": "shell", "args": "whoami"}, "name": "run_whoami"},

        ]

        ret = runner.run(tasks, "all")

        print(ret.results_summary)

        print(ret.results_raw)

def TestCommandRunner():

        """

        执行单个命令,返回结果

        :return:

        """

        host_data = [

            {

                "hostname": "testserver",

                "ip": "192.168.10.93",

                "port": 22,

                "username": "root",

                "password": "password",

            },

        ]

        inventory = baseInventory(host_data)

        runner = CommandRunner(inventory)

        res = runner.execute('pwd', 'all')

        print(res.results_command)

        print(res.results_raw)

        print(res.results_command['testserver']['stdout'])

if __name__ == "__main__":

    TestAdHocRunner()

    TestCommandRunner()




from inventory import baseInventory

def  Test():

        """

        返回主机信息,组信息,组内主机信息

        :return:

        """

        host_list = [{

            "hostname": "testserver1",

            "ip": "102.1.1.1",

            "port": 22,

            "username": "root",

            "password": "password",

            "private_key": "/tmp/private_key",

            "become": {

                "method": "sudo",

                "user": "root",

                "pass": None,

            },

            "groups": ["group1", "group2"],

            "vars": {"sexy": "yes"},

        }, {

            "hostname": "testserver2",

            "ip": "8.8.8.8",

            "port": 2222,

            "username": "root",

            "password": "password",

            "private_key": "/tmp/private_key",

            "become": {

                "method": "su",

                "user": "root",

                "pass": "123",

            },

            "groups": ["group3", "group4"],

            "vars": {"love": "yes"},

        }]

        inventory = baseInventory(host_list=host_list)

        print("#"*10 + "Hosts" + "#"*10)

        for host in inventory.hosts:

            print(host)

        print("#" * 10 + "Groups" + "#" * 10)

        for group in inventory.groups:

            print(group)

        print("#" * 10 + "all group hosts" + "#" * 10)

        group = inventory.get_group('all')

        print(group.hosts)

if __name__ == '__main__':

     Test()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/222926.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号