hacking/test-module.py
33d27288
 #!/usr/bin/env python
 
 # (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
 #
 # This file is part of Ansible
 #
 # Ansible is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 # (at your option) any later version.
 #
 # Ansible is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 # GNU General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License
 # along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
 #
 
 # this script is for testing modules without running through the
 # entire guts of ansible, and is very helpful for when developing
 # modules
 #
 # example:
 #    ./hacking/test-module.py -m lib/ansible/modules/commands/command.py -a "/bin/sleep 3"
 #    ./hacking/test-module.py -m lib/ansible/modules/commands/command.py -a "/bin/sleep 3" --debugger /usr/bin/pdb
 #    ./hacking/test-module.py -m lib/ansible/modules/files/lineinfile.py -a "dest=/etc/exports line='/srv/home hostname1(rw,sync)'" --check
 #    ./hacking/test-module.py -m lib/ansible/modules/commands/command.py -a "echo hello" -n -o "test_hello"
 
f510d599
 import glob
33d27288
 import optparse
 import os
 import subprocess
 import sys
 import traceback
 import shutil
 
 from ansible.release import __version__
 import ansible.utils.vars as utils_vars
 from ansible.parsing.dataloader import DataLoader
 from ansible.parsing.utils.jsonify import jsonify
 from ansible.parsing.splitter import parse_kv
 import ansible.executor.module_common as module_common
 import ansible.constants as C
 from ansible.module_utils._text import to_native, to_text
 from ansible.template import Templar
 
 import json
 
 
 def parse():
     """parse command line
 
     :return : (options, args)"""
     parser = optparse.OptionParser()
 
     parser.usage = "%prog -[options] (-h for help)"
 
     parser.add_option('-m', '--module-path', dest='module_path',
                       help="REQUIRED: full path of module source to execute")
     parser.add_option('-a', '--args', dest='module_args', default="",
                       help="module argument string")
     parser.add_option('-D', '--debugger', dest='debugger',
                       help="path to python debugger (e.g. /usr/bin/pdb)")
     parser.add_option('-I', '--interpreter', dest='interpreter',
                       help="path to interpreter to use for this module"
                       " (e.g. ansible_python_interpreter=/usr/bin/python)",
                       metavar='INTERPRETER_TYPE=INTERPRETER_PATH',
                       default="ansible_python_interpreter=%s" %
                       (sys.executable if sys.executable else '/usr/bin/python'))
     parser.add_option('-c', '--check', dest='check', action='store_true',
                       help="run the module in check mode")
     parser.add_option('-n', '--noexecute', dest='execute', action='store_false',
                       default=True, help="do not run the resulting module")
     parser.add_option('-o', '--output', dest='filename',
                       help="Filename for resulting module",
                       default="~/.ansible_module_generated")
     options, args = parser.parse_args()
     if not options.module_path:
         parser.print_help()
         sys.exit(1)
     else:
         return options, args
 
 
 def write_argsfile(argstring, json=False):
     """ Write args to a file for old-style module's use. """
     argspath = os.path.expanduser("~/.ansible_test_module_arguments")
     argsfile = open(argspath, 'w')
     if json:
         args = parse_kv(argstring)
         argstring = jsonify(args)
     argsfile.write(argstring)
     argsfile.close()
     return argspath
 
 
 def get_interpreters(interpreter):
     result = dict()
     if interpreter:
         if '=' not in interpreter:
             print("interpreter must by in the form of ansible_python_interpreter=/usr/bin/python")
             sys.exit(1)
         interpreter_type, interpreter_path = interpreter.split('=')
         if not interpreter_type.startswith('ansible_'):
             interpreter_type = 'ansible_%s' % interpreter_type
         if not interpreter_type.endswith('_interpreter'):
             interpreter_type = '%s_interpreter' % interpreter_type
         result[interpreter_type] = interpreter_path
     return result
 
 
 def boilerplate_module(modfile, args, interpreters, check, destfile):
     """ simulate what ansible does with new style modules """
 
     # module_fh = open(modfile)
     # module_data = module_fh.read()
     # module_fh.close()
 
     # replacer = module_common.ModuleReplacer()
     loader = DataLoader()
 
     # included_boilerplate = module_data.find(module_common.REPLACER) != -1 or module_data.find("import ansible.module_utils") != -1
 
     complex_args = {}
 
     # default selinux fs list is pass in as _ansible_selinux_special_fs arg
     complex_args['_ansible_selinux_special_fs'] = C.DEFAULT_SELINUX_SPECIAL_FS
     complex_args['_ansible_tmpdir'] = C.DEFAULT_LOCAL_TMP
     complex_args['_ansible_keep_remote_files'] = C.DEFAULT_KEEP_REMOTE_FILES
     complex_args['_ansible_version'] = __version__
 
     if args.startswith("@"):
         # Argument is a YAML file (JSON is a subset of YAML)
         complex_args = utils_vars.combine_vars(complex_args, loader.load_from_file(args[1:]))
         args = ''
     elif args.startswith("{"):
         # Argument is a YAML document (not a file)
         complex_args = utils_vars.combine_vars(complex_args, loader.load(args))
         args = ''
 
     if args:
         parsed_args = parse_kv(args)
         complex_args = utils_vars.combine_vars(complex_args, parsed_args)
 
     task_vars = interpreters
 
     if check:
         complex_args['_ansible_check_mode'] = True
 
     modname = os.path.basename(modfile)
     modname = os.path.splitext(modname)[0]
     (module_data, module_style, shebang) = module_common.modify_module(
         modname,
         modfile,
         complex_args,
         Templar(loader=loader),
         task_vars=task_vars
     )
 
     if module_style == 'new' and '_ANSIBALLZ_WRAPPER = True' in to_native(module_data):
         module_style = 'ansiballz'
 
     modfile2_path = os.path.expanduser(destfile)
     print("* including generated source, if any, saving to: %s" % modfile2_path)
     if module_style not in ('ansiballz', 'old'):
         print("* this may offset any line numbers in tracebacks/debuggers!")
     modfile2 = open(modfile2_path, 'wb')
     modfile2.write(module_data)
     modfile2.close()
     modfile = modfile2_path
 
     return (modfile2_path, modname, module_style)
 
 
 def ansiballz_setup(modfile, modname, interpreters):
     os.system("chmod +x %s" % modfile)
 
     if 'ansible_python_interpreter' in interpreters:
         command = [interpreters['ansible_python_interpreter']]
     else:
         command = []
     command.extend([modfile, 'explode'])
 
     cmd = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     out, err = cmd.communicate()
     out, err = to_text(out, errors='surrogate_or_strict'), to_text(err)
     lines = out.splitlines()
     if len(lines) != 2 or 'Module expanded into' not in lines[0]:
         print("*" * 35)
         print("INVALID OUTPUT FROM ANSIBALLZ MODULE WRAPPER")
         print(out)
         sys.exit(err)
     debug_dir = lines[1].strip()
 
f510d599
     # All the directories in an AnsiBallZ that modules can live
     core_dirs = glob.glob(os.path.join(debug_dir, 'ansible/modules'))
     collection_dirs = glob.glob(os.path.join(debug_dir, 'ansible_collections/*/*/plugins/modules'))
 
     # There's only one module in an AnsiBallZ payload so look for the first module and then exit
     for module_dir in core_dirs + collection_dirs:
         for dirname, directories, filenames in os.walk(module_dir):
             for filename in filenames:
                 if filename == modname + '.py':
                     modfile = os.path.join(dirname, filename)
                     break
 
33d27288
     argsfile = os.path.join(debug_dir, 'args')
 
     print("* ansiballz module detected; extracted module source to: %s" % debug_dir)
     return modfile, argsfile
 
 
 def runtest(modfile, argspath, modname, module_style, interpreters):
     """Test run a module, piping it's output for reporting."""
     invoke = ""
     if module_style == 'ansiballz':
         modfile, argspath = ansiballz_setup(modfile, modname, interpreters)
         if 'ansible_python_interpreter' in interpreters:
             invoke = "%s " % interpreters['ansible_python_interpreter']
 
     os.system("chmod +x %s" % modfile)
 
     invoke = "%s%s" % (invoke, modfile)
     if argspath is not None:
         invoke = "%s %s" % (invoke, argspath)
 
     cmd = subprocess.Popen(invoke, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     (out, err) = cmd.communicate()
     out, err = to_text(out), to_text(err)
 
     try:
         print("*" * 35)
         print("RAW OUTPUT")
         print(out)
         print(err)
         results = json.loads(out)
     except Exception:
         print("*" * 35)
         print("INVALID OUTPUT FORMAT")
         print(out)
         traceback.print_exc()
         sys.exit(1)
 
     print("*" * 35)
     print("PARSED OUTPUT")
     print(jsonify(results, format=True))
 
 
 def rundebug(debugger, modfile, argspath, modname, module_style, interpreters):
     """Run interactively with console debugger."""
 
     if module_style == 'ansiballz':
         modfile, argspath = ansiballz_setup(modfile, modname, interpreters)
 
     if argspath is not None:
         subprocess.call("%s %s %s" % (debugger, modfile, argspath), shell=True)
     else:
         subprocess.call("%s %s" % (debugger, modfile), shell=True)
 
 
 def main():
 
     options, args = parse()
     interpreters = get_interpreters(options.interpreter)
     (modfile, modname, module_style) = boilerplate_module(options.module_path, options.module_args, interpreters, options.check, options.filename)
 
     argspath = None
     if module_style not in ('new', 'ansiballz'):
         if module_style in ('non_native_want_json', 'binary'):
             argspath = write_argsfile(options.module_args, json=True)
         elif module_style == 'old':
             argspath = write_argsfile(options.module_args, json=False)
         else:
             raise Exception("internal error, unexpected module style: %s" % module_style)
 
     if options.execute:
         if options.debugger:
             rundebug(options.debugger, modfile, argspath, modname, module_style, interpreters)
         else:
             runtest(modfile, argspath, modname, module_style, interpreters)
 
 
 if __name__ == "__main__":
     try:
         main()
     finally:
         shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)