Source code for saruman.tasks.kernel.modprobe

# -*- coding: utf-8 -*-

"""
tasks.kernel.modprobe
=====================

Contains all the functionalities that help loading or unloading a kernel module
"""
from celery import Task
from celery.utils.log import get_task_logger

# noinspection PyUnresolvedReferences
from plumbum.cmd import lsmod, modprobe

from saruman.helpers.error_handling import error_handling
from saruman.helpers.exceptions import FirewallNotAllowedError

__all__ = ['Check', 'Add', 'AddWithArgs', 'Remove']

logger = get_task_logger(__name__)

allowed_mod = ['dummy']


[docs]class Check(Task): """ Tache de vérification de l'activation d'un module dans le kernel Vérifie si le module `module_name` est activé dans le kernel. Se réfère à une liste des modules autorisés (ainsi, l'utilisateur ne peut pas supprimer le module du filesystem par exemple). La tache tourne dans un context (:py:func:error_handling) qui gère les erreurs """ name = 'kernel.modules.check'
[docs] def run(self, module_name): """ :param str module_name: le nom du module à checker :return: oui si le module est activé, non sinon :rtype: bool """ with error_handling(logger): logger.info("Check pour voir si le module `{}` du kernel est activé".format(module_name)) if module_name not in allowed_mod: raise FirewallNotAllowedError("Le module `{}` du kernel n'est pas whitelisté".format(module_name)) logger.debug("Execution de lsmod") mod = lsmod() return module_name in mod
[docs]class Add(Task): """ Tache de vérification de l'activation d'un module dans le kernel Vérifie si le module `module_name` est activé dans le kernel. Se réfère à une liste des modules autorisés (ainsi, l'utilisateur ne peut pas supprimer le module du filesystem par exemple). La tache tourne dans un context (:py:func:error_handling) qui gère les erreurs """ name = 'kernel.modules.add'
[docs] def run(self, module_name): """ :param str module_name: le nom du module à checker """ with error_handling(logger): logger.info("Activation du module `{}` du kernel".format(module_name)) if module_name not in allowed_mod: raise FirewallNotAllowedError("Le module `{}` du kernel n'est pas whitelisté".format(module_name)) logger.debug("Execution de modprobe") result = modprobe[module_name]() logger.info("Le module `{}` du kernel est {}".format(module_name, "activé" if result else "désactivé"))
[docs]class AddWithArgs(Task): """ Tache de vérification de l'activation d'un module dans le kernel Vérifie si le module `module_name` est activé dans le kernel. Se réfère à une liste des modules autorisés (ainsi, l'utilisateur ne peut pas supprimer le module du filesystem par exemple). La tache tourne dans un context (:py:func:error_handling) qui gère les erreurs """ name = 'kernel.modules.addWithArgs'
[docs] def run(self, module_name, module_args): """ :param str module_name: le nom du module à checker :param dict module_args: un dictionnaire d'arguments """ with error_handling(logger): logger.info("Activation du module `{}` du kernel".format(module_name)) if module_name not in allowed_mod: raise FirewallNotAllowedError("Le module `{}` du kernel n'est pas whitelisté".format(module_name)) logger.debug("Execution de modprobe") arguments = ['{}={}'.format(k, v) for k, v in module_args.items()] arguments = [module_name].extend(arguments) result = modprobe(arguments) logger.info("Le module `{}` du kernel est {}".format(module_name, "activé" if result else "désactivé"))
[docs]class Remove(Task): name = 'kernel.modules.remove'
[docs] def run(self, module_name): with error_handling(logger): logger.info("Désactivation du module `{}` du kernel".format(module_name)) if module_name not in allowed_mod: raise FirewallNotAllowedError("Le module `{}` du kernel n'est pas whitelisté".format(module_name)) logger.debug("Execution de modprobe") result = modprobe["-r", module_name]() logger.info("Le module `{}` du kernel est {}".format(module_name, "activé" if result else "désactivé"))