1c1527c7 |
os.execvp('sqlite3', ('sqlite3', DATABASE_PATH))
def populate_database():
populate_modules()
populate_coverage()
populate_integration_targets()
def populate_modules():
module_dir = os.path.join(BASE_PATH, 'lib/ansible/modules/')
modules_rows = []
module_statuses_rows = []
for root, dir_names, file_names in os.walk(module_dir):
for file_name in file_names:
module, extension = os.path.splitext(file_name)
if module == '__init__' or extension != '.py':
continue
if module.startswith('_'):
module = module[1:]
namespace = os.path.join(root.replace(module_dir, '')).replace('/', '.')
path = os.path.join(root, file_name)
with open(path, 'rb') as module_fd:
module_data = module_fd.read()
result = extract_metadata(module_data=module_data)
metadata = result[0]
if not metadata:
if module == 'async_wrapper':
continue
raise Exception('no metadata for: %s' % path)
modules_rows.append(dict(
module=module,
namespace=namespace,
path=path.replace(BASE_PATH, ''),
supported_by=metadata['supported_by'],
))
for status in metadata['status']:
module_statuses_rows.append(dict(
module=module,
status=status,
))
populate_data(dict(
modules=dict(
rows=modules_rows,
schema=(
('module', 'TEXT'),
('namespace', 'TEXT'),
('path', 'TEXT'),
('supported_by', 'TEXT'),
)),
module_statuses=dict(
rows=module_statuses_rows,
schema=(
('module', 'TEXT'),
('status', 'TEXT'),
)),
))
def populate_coverage():
response = requests.get('https://codecov.io/api/gh/ansible/ansible/tree/devel/?src=extension')
data = response.json()
files = data['commit']['report']['files']
coverage_rows = []
for path, data in files.items():
report = data['t']
coverage_rows.append(dict(
path=path,
coverage=float(report['c']),
lines=report['n'],
hit=report['h'],
partial=report['p'],
missed=report['m'],
))
populate_data(dict(
coverage=dict(
rows=coverage_rows,
schema=(
('path', 'TEXT'),
('coverage', 'REAL'),
('lines', 'INTEGER'),
('hit', 'INTEGER'),
('partial', 'INTEGER'),
('missed', 'INTEGER'),
)),
))
def populate_integration_targets():
targets = list(walk_integration_targets())
integration_targets_rows = [dict(
target=target.name,
type=target.type,
path=target.path,
script_path=target.script_path,
) for target in targets]
integration_target_aliases_rows = [dict(
target=target.name,
alias=alias,
) for target in targets for alias in target.aliases]
integration_target_modules_rows = [dict(
target=target.name,
module=module,
) for target in targets for module in target.modules]
populate_data(dict(
integration_targets=dict(
rows=integration_targets_rows,
schema=(
('target', 'TEXT'),
('type', 'TEXT'),
('path', 'TEXT'),
('script_path', 'TEXT'),
)),
integration_target_aliases=dict(
rows=integration_target_aliases_rows,
schema=(
('target', 'TEXT'),
('alias', 'TEXT'),
)),
integration_target_modules=dict(
rows=integration_target_modules_rows,
schema=(
('target', 'TEXT'),
('module', 'TEXT'),
)),
))
def create_table(cursor, name, columns):
schema = ', '.join('%s %s' % column for column in columns)
cursor.execute('DROP TABLE IF EXISTS %s' % name)
cursor.execute('CREATE TABLE %s (%s)' % (name, schema))
def populate_table(cursor, rows, name, columns):
create_table(cursor, name, columns)
values = ', '.join([':%s' % column[0] for column in columns])
for row in rows:
cursor.execute('INSERT INTO %s VALUES (%s)' % (name, values), row)
def populate_data(data):
connection = sqlite3.connect(DATABASE_PATH)
cursor = connection.cursor()
for table in data:
populate_table(cursor, data[table]['rows'], table, data[table]['schema'])
connection.commit()
connection.close()
if __name__ == '__main__':
main() |