hacking/report.py
1c1527c7
 #!/usr/bin/env python
 # PYTHON_ARGCOMPLETE_OK
 """A tool to aggregate data about Ansible source and testing into a sqlite DB for reporting."""
 
 from __future__ import (absolute_import, print_function)
 
 import argparse
7a61763f
 import json
1c1527c7
 import os
 import sqlite3
 import sys
 
 DATABASE_PATH = os.path.expanduser('~/.ansible/report.db')
 BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) + '/'
 ANSIBLE_PATH = os.path.join(BASE_PATH, 'lib')
d651bda1
 ANSIBLE_TEST_PATH = os.path.join(BASE_PATH, 'test/lib')
1c1527c7
 
 if ANSIBLE_PATH not in sys.path:
     sys.path.insert(0, ANSIBLE_PATH)
 
 if ANSIBLE_TEST_PATH not in sys.path:
     sys.path.insert(0, ANSIBLE_TEST_PATH)
 
7a61763f
 from ansible.module_utils.urls import open_url
1c1527c7
 from ansible.parsing.metadata import extract_metadata
7a61763f
 
d651bda1
 from ansible_test._internal.target import walk_integration_targets
1c1527c7
 
 
 def main():
     os.chdir(BASE_PATH)
 
     args = parse_args()
     args.func()
 
 
 def parse_args():
     try:
         import argcomplete
     except ImportError:
         argcomplete = None
 
     parser = argparse.ArgumentParser()
 
     subparsers = parser.add_subparsers(metavar='COMMAND')
     subparsers.required = True  # work-around for python 3 bug which makes subparsers optional
 
     populate = subparsers.add_parser('populate',
                                      help='populate report database')
 
     populate.set_defaults(func=populate_database)
 
     query = subparsers.add_parser('query',
                                   help='query report database')
 
     query.set_defaults(func=query_database)
 
     if argcomplete:
         argcomplete.autocomplete(parser)
 
     args = parser.parse_args()
 
     return args
 
 
 def query_database():
     if not os.path.exists(DATABASE_PATH):
36f82ae8
         sys.exit('error: Database not found. Did you run `report.py populate` first?')
1c1527c7
 
     os.execvp('sqlite3', ('sqlite3', DATABASE_PATH))
 
 
 def populate_database():
     populate_modules()
     populate_coverage()
     populate_integration_targets()
 
 
 def populate_modules():
     module_dir = os.path.join(BASE_PATH, 'lib/ansible/modules/')
 
     modules_rows = []
     module_statuses_rows = []
 
     for root, dir_names, file_names in os.walk(module_dir):
         for file_name in file_names:
             module, extension = os.path.splitext(file_name)
 
             if module == '__init__' or extension != '.py':
                 continue
 
             if module.startswith('_'):
                 module = module[1:]
 
             namespace = os.path.join(root.replace(module_dir, '')).replace('/', '.')
 
             path = os.path.join(root, file_name)
 
             with open(path, 'rb') as module_fd:
                 module_data = module_fd.read()
 
             result = extract_metadata(module_data=module_data)
 
             metadata = result[0]
 
             if not metadata:
                 if module == 'async_wrapper':
                     continue
 
                 raise Exception('no metadata for: %s' % path)
 
             modules_rows.append(dict(
                 module=module,
                 namespace=namespace,
                 path=path.replace(BASE_PATH, ''),
                 supported_by=metadata['supported_by'],
             ))
 
             for status in metadata['status']:
                 module_statuses_rows.append(dict(
                     module=module,
                     status=status,
                 ))
 
     populate_data(dict(
         modules=dict(
             rows=modules_rows,
             schema=(
                 ('module', 'TEXT'),
                 ('namespace', 'TEXT'),
                 ('path', 'TEXT'),
                 ('supported_by', 'TEXT'),
             )),
         module_statuses=dict(
             rows=module_statuses_rows,
             schema=(
                 ('module', 'TEXT'),
                 ('status', 'TEXT'),
             )),
     ))
 
 
 def populate_coverage():
7a61763f
     response = open_url('https://codecov.io/api/gh/ansible/ansible/tree/devel/?src=extension')
     data = json.load(response)
1c1527c7
     files = data['commit']['report']['files']
     coverage_rows = []
 
     for path, data in files.items():
         report = data['t']
         coverage_rows.append(dict(
             path=path,
             coverage=float(report['c']),
             lines=report['n'],
             hit=report['h'],
             partial=report['p'],
             missed=report['m'],
         ))
 
     populate_data(dict(
         coverage=dict(
             rows=coverage_rows,
             schema=(
                 ('path', 'TEXT'),
                 ('coverage', 'REAL'),
                 ('lines', 'INTEGER'),
                 ('hit', 'INTEGER'),
                 ('partial', 'INTEGER'),
                 ('missed', 'INTEGER'),
             )),
     ))
 
 
 def populate_integration_targets():
     targets = list(walk_integration_targets())
 
     integration_targets_rows = [dict(
         target=target.name,
         type=target.type,
         path=target.path,
         script_path=target.script_path,
     ) for target in targets]
 
     integration_target_aliases_rows = [dict(
         target=target.name,
         alias=alias,
     ) for target in targets for alias in target.aliases]
 
     integration_target_modules_rows = [dict(
         target=target.name,
         module=module,
     ) for target in targets for module in target.modules]
 
     populate_data(dict(
         integration_targets=dict(
             rows=integration_targets_rows,
             schema=(
                 ('target', 'TEXT'),
                 ('type', 'TEXT'),
                 ('path', 'TEXT'),
                 ('script_path', 'TEXT'),
             )),
         integration_target_aliases=dict(
             rows=integration_target_aliases_rows,
             schema=(
                 ('target', 'TEXT'),
                 ('alias', 'TEXT'),
             )),
         integration_target_modules=dict(
             rows=integration_target_modules_rows,
             schema=(
                 ('target', 'TEXT'),
                 ('module', 'TEXT'),
             )),
     ))
 
 
 def create_table(cursor, name, columns):
     schema = ', '.join('%s %s' % column for column in columns)
 
     cursor.execute('DROP TABLE IF EXISTS %s' % name)
     cursor.execute('CREATE TABLE %s (%s)' % (name, schema))
 
 
 def populate_table(cursor, rows, name, columns):
     create_table(cursor, name, columns)
 
     values = ', '.join([':%s' % column[0] for column in columns])
 
     for row in rows:
         cursor.execute('INSERT INTO %s VALUES (%s)' % (name, values), row)
 
 
 def populate_data(data):
     connection = sqlite3.connect(DATABASE_PATH)
     cursor = connection.cursor()
 
     for table in data:
         populate_table(cursor, data[table]['rows'], table, data[table]['schema'])
 
     connection.commit()
     connection.close()
 
 
 if __name__ == '__main__':
     main()