Tutoriales

Cómo escribir un script de Python para crear un inventario dinámico de Ansible

En mi artículo anterior, expliqué por qué no es muy conveniente usar manifiestos estáticos para trabajar con sus playbooks de Ansible. Este es el por qué:

  • El inventario puede ser muy grande.

  • Los dispositivos en el inventario que requieren automatización pueden cambiar con frecuencia (p. ej., un grupo de computadoras portátiles Linux que usan DHCP).

En mi último artículo, utilicé host_list y el complemento Nmap para generar manifiestos dinámicos para cerrar esta brecha. Este artículo describe cómo escribir sus propios scripts de manifiesto dinámico en Python, siguiendo las buenas prácticas para empaquetar herramientas, usar entornos virtuales y realizar pruebas unitarias de su código.

Ingrese al mundo del inventario dinámico

La documentación de Ansible explica varias formas de generar un inventario dinámico; decidí escribir un script de Python simple como interfaz para el comando Nmap.

¿Por qué elegirías escribir tu propio guión dinámico?

  • Tiene código heredado escrito en lenguajes distintos a Python (Java, Perl, Bash, Ruby, Go, sin límite) y desea reutilizar esa lógica para generar la lista de hosts.
  • Usted o su equipo dominan un idioma específico (por ejemplo, Bash o Ruby). Las secuencias de comandos dinámicas de Ansible son lo suficientemente flexibles como para que pueda escribir complementos en el idioma de su elección.

Para ilustrar esto, escribiré un script que use Nmap para obtener hosts.

establecer la fundación

El primer paso es construir la base de código, ejecutar la interfaz de línea de comandos de Nmap y analizar los resultados en formato XML.

La base es un contenedor para el comando Nmap de la siguiente manera:

  1. NmapRunner Ejecute el comando Nmap con las banderas deseadas y capture la salida XML.
  2. analizador de salida Analice el XML y devuelva solo las direcciones IP requeridas.
  3. NmapRunner Se implementa un iterador para que pueda procesar cada dirección de la forma que considere adecuada.

Aquí está el código de Python:

import os
import shlex
import shutil
import subprocess
from typing import List, Dict
from xml.etree import ElementTree


class OutputParser:
    def __init__(self, xml: str):
        self.xml = xml

    def get_addresses(self) -> List[Dict[str, str]]:
        """
        Several things need to happen for an address to be included:
        1. Host is up
        2. Port is TCP 22
        3. Port status is open
        Otherwise the iterator will not be filled
        :return:
        """
        addresses = []
        root = ElementTree.fromstring(self.xml)
        for host in root.findall('host'):
            name = None
            for hostnames in host.findall('hostnames'):
                for hostname in hostnames:
                    name = hostname.attrib['name']
                    break
            if not name:
                continue
            is_up = True
            for status in host.findall('status'):
                if status.attrib['state'] == 'down':
                    is_up = False
                    break
            if not is_up:
                continue
            port_22_open = False
            for ports in host.findall('ports'):
                for port in ports.findall('port'):
                    if port.attrib['portid'] == '22':
                        for state in port.findall('state'):
                            if state.attrib['state'] == "open":  # Up not the same as open, we want SSH access!
                                port_22_open = True
                                break
            if not port_22_open:
                continue
            address = None
            for address_data in host.findall('address'):
                address = address_data.attrib['addr']
                break
            addresses.append({name: address})
        return addresses


class NmapRunner:

    def __init__(self, hosts: str):
        self.nmap_report_file = None
        found_nmap = shutil.which('nmap', mode=os.F_OK | os.X_OK)
        if not found_nmap:
            raise ValueError(f"Nmap is missing!")
        self.nmap = found_nmap
        self.hosts = hosts

    def __iter__(self):
        command = [self.nmap]
        command.extend(__NMAP__FLAGS__)
        command.append(self.hosts)
        completed = subprocess.run(
            command,
            capture_output=True,
            shell=False,
            check=True
        )
        completed.check_returncode()
        out_par = OutputParser(completed.stdout.decode('utf-8'))
        self.addresses = out_par.get_addresses()
        return self

    def __next__(self):
        try:
            return self.addresses.pop()
        except IndexError:
            raise StopIteration


"""
Convert the args for proper usage on the Nmap CLI
Also, do not use the -n flag. We need to resolve IP addresses to hostname, even if we sacrifice a little bit of speed
"""
NMAP_DEFAULT_FLAGS = {
    '-p22': 'Port 22 scanning',
    '-T4': 'Aggressive timing template',
    '-PE': 'Enable this echo request behavior. Good for internal networks',
    '--disable-arp-ping': 'No ARP or ND Ping',
    '--max-hostgroup 50': 'Hostgroup (batch of hosts scanned concurrently) size',
    '--min-parallelism 50': 'Number of probes that may be outstanding for a host group',
    '--osscan-limit': 'Limit OS detection to promising targets',
    '--max-os-tries 1': 'Maximum number of OS detection tries against a target',
    '-oX -': 'Send XML output to STDOUT, avoid creating a temp file'
}
__NMAP__FLAGS__ = shlex.split(" ".join(NMAP_DEFAULT_FLAGS.keys()))

Por ejemplo, puedes usar NmapRunner Me gusta esto:

import pprint
def test_iter():
    for hosts_data in NmapRunner("192.168.1.0/24"):
        pprint.print(hosts_data)

Lo crea o no, esta es la parte más desafiante de escribir scripts de Python. La siguiente sección requiere escribir un script que cumpla con los requisitos de Ansible para los scripts de inventario dinámico.

Escribir guiones de inventario

La documentación de Ansible es muy clara sobre los requisitos para los scripts de inventario:

  1. debe apoyar --list y --host Logotipo no incluido.

  2. Debe devolver JSON en un formato que Ansible pueda entender.
  3. Se pueden agregar otras banderas, pero Ansible no las usará.

Pero por favor espera.No dice que Ansible proporciona una red para escanear hosts, por lo que ¿Cómo te inyectas?

¡sencillo!El script leerá un archivo de configuración YAML desde una ubicación predefinida, p. /home/josevnz/.ansible/plugins/cliconf/nmap_plugin.yaml Usa el siguiente código:

# Sample configuration file. Suspiciously similar to the official Nmap plugin configuration file
---
plugin: nmap_plugin
address: 192.168.1.0/24

La clase para leer un archivo de configuración YAML es muy simple:

"""
Using a configuration file in YAML format, so it can be reused by the plugin.
Init file with ConfigParser is more convenient, trying to keep Ansible happy :wink:
"""
import os
from yaml import safe_load

try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

def load_config(config_file: str = os.path.expanduser("~/.ansible/plugins/cliconf/nmap_inventory.cfg")):
    """
    Where to copy the configuration file:
    ```shell
    [josevnz@dmaf5 EnableSysadmin]$ ansible-config dump |grep DEFAULT_CLICONF_PLUGIN_PATH
    DEFAULT_CLICONF_PLUGIN_PATH(default) = ['/home/josevnz/.ansible/plugins/cliconf', '/usr/share/ansible/plugins/cliconf']
    ```
    :param config_file:
    :return:
    """
    with open(config_file, 'r') as stream:
        data = safe_load(stream)
        return data

muy bien. Aquí está el código de secuencia de comandos de inventario dinámico actual:

!/usr/bin/env python
"""
# nmap_inventory.py - Generates an Ansible dynamic inventory using NMAP
# Author
Jose Vicente Nunez Zuleta ([email protected])
"""
import json
import os.path
import argparse
from configparser import ConfigParser, MissingSectionHeaderError

from inventories.nmap import NmapRunner

def load_config() -> ConfigParser:
    cp = ConfigParser()
    try:
        config_file = os.path.expanduser("~/.config/nmap_inventory.cfg")
        cp.read(config_file)
        if not cp.has_option('DEFAULT', 'Addresses'):
            raise ValueError("Missing configuration option: DEFAULT -> Addresses")
    except MissingSectionHeaderError as mhe:
        raise ValueError("Invalid or missing configuration file:", mhe)
    return cp


def get_empty_vars():
    return json.dumps({})


def get_list(search_address: str, pretty=False) -> str:
    """
    All group is always returned
    Ungrouped at least contains all the names found
    IP addresses are added as vars in the __meta tag, for efficiency as mentioned in the Ansible documentation.
    Note than we can add logic here to put machines in custom groups, will keep it simple for now.
    :param search_address: Results of the scan with Nmap
    :param pretty: Indentation
    :return: JSON string
    """
    found_data = list(NmapRunner(search_address))
    hostvars = {}
    ungrouped = []
    for host_data in found_data:
        for name, address in host_data.items():
            if name not in ungrouped:
                ungrouped.append(name)
            if name not in hostvars:
                hostvars[name] = {'ip': []}
            hostvars[name]['ip'].append(address)
    data = {
        '_meta': {
          'hostvars': hostvars
        },
        'all': {
            'children': [
                'ungrouped'
            ]
        },
        'ungrouped': {
            'hosts': ungrouped
        }
    }
    return json.dumps(data, indent=pretty)

if __name__ == '__main__':

    arg_parser = argparse.ArgumentParser(
        description=__doc__,
        prog=__file__
    )
    arg_parser.add_argument(
        '--pretty',
        action='store_true',
        default=False,
        help="Pretty print JSON"
    )
    mandatory_options = arg_parser.add_mutually_exclusive_group()
    mandatory_options.add_argument(
        '--list',
        action='store',
        nargs="*",
        default="dummy",
        help="Show JSON of all managed hosts"
    )
    mandatory_options.add_argument(
        '--host',
        action='store',
        help="Display vars related to the host"
    )

    try:
        config = load_config()
        addresses = config.get('DEFAULT', 'Addresses')

        args = arg_parser.parse_args()
        if args.host:
            print(get_empty_vars())
        elif len(args.list) >= 0:
            print(get_list(addresses, args.pretty))
        else:
            raise ValueError("Expecting either --host $HOSTNAME or --list")

    except ValueError:
        raise

Es posible que hayas notado algunas cosas:

  1. Además de representar JSON, la mayor parte del código de este script está dedicado a manejar parámetros y cargar la configuración.
  2. Puede agregar lógica de agrupación a get_listAhora estoy completando los dos grupos predeterminados requeridos.

[ Looking for more on system automation? Get started with The Automated Enterprise, a free book from Red Hat. ]

intentalo

Es hora de patear los neumáticos. Instale el código primero:

$ git clone [email protected]:josevnz/ExtendingAnsibleWithPython.git
$ cd ExtendingAnsibleWithPython/Inventory
$ python3 -m venv ~/virtualenv/ExtendingAnsibleWithPythonInventory
. ~/virtualenv/ExtendingAnsibleWithPythonInventory/bin/activate
$ pip install wheel
$ pip install --upgrade pip
$ pip install build
$ python setup.py bdist_wheel
$ pip install dist/*

El entorno virtual ahora debería estar activo. Vea si obtiene un resultado de información de host vacío (usando el nombre de la máquina en la red):

$ ansible-inventory --inventory scripts/nmap_inventory.py --host raspberrypi
{}

ok, se espera JSON vacío porque no implementaste --host $HOSTNAME método.acerca de --list?

$ ansible-inventory --inventory scripts/nmap_inventory.py --list
{
    "_meta": {
        "hostvars": {
            "dmaf5.home": {
                "ip": [
                    "192.168.1.26",
                    "192.168.1.25"
                ]
            },
            "macmini2": {
                "ip": [
                    "192.168.1.16"
                ]
            },
            "raspberrypi": {
                "ip": [
                    "192.168.1.11"
                ]
            }
        }
    },
    "all": {
        "children": [
            "ungrouped"
        ]
    },
    "ungrouped": {
        "hosts": [
            "dmaf5.home",
            "macmini2",
            "raspberrypi"
        ]
    }
}

Finalmente, pruebe el nuevo stock. ping Módulo:

$ ansible --inventory scripts/nmap_inventory.py --user josevnz -m ping all
dmaf5.home | SUCCESS => {
    "changed": false,
    "ping": "pong"
}
raspberrypi | SUCCESS => {
    "changed": false,
    "ping": "pong"
}
macmini2 | SUCCESS => {
    "changed": false,
    "ping": "pong"
}

¿Que sigue?

Este artículo cubre un mucho material He aquí un resumen de su trabajo:

  • Escribir clases de utilidad para llamar a Nmap y analizar los resultados del escaneo
  • Reutilice estas clases en scripts que cumplan con los requisitos de inventario de Ansible para que pueda usarse para crear un inventario dinámico.

Esta es probablemente la opción más flexible para crear manifiestos dinámicos en términos de codificación, ya que los requisitos son muy flexibles y se pueden realizar en cualquier idioma.

¿Pero es este el camino correcto? En la última parte de esta serie de artículos, le mostraré por qué sería mejor escribir un complemento de Ansible en lugar de usar un script de inventario.

¡Recuerda que puedes descargar el código y experimentar! La mejor manera de aprender es a través de la práctica y los errores.

LEER  Cómo instalar el último cliente BitTorrent de Deluge en Linux

Publicaciones relacionadas

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Botón volver arriba