# Copyright: (c) 2019, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import fnmatch
import json
import operator
import os
import shutil
import tarfile
import tempfile
import threading
import time
import yaml

from contextlib import contextmanager
from distutils.version import LooseVersion, StrictVersion
from hashlib import sha256
from io import BytesIO
from yaml.error import YAMLError

try:
    import queue
except ImportError:
    import Queue as queue  # Python 2

import ansible.constants as C
from ansible.errors import AnsibleError
from ansible.galaxy import get_collections_galaxy_meta_info
from ansible.galaxy.api import CollectionVersionMetadata, GalaxyError
from ansible.module_utils import six
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.utils.collection_loader import AnsibleCollectionRef
from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash, secure_hash_s
from ansible.module_utils.urls import open_url

urlparse = six.moves.urllib.parse.urlparse
urllib_error = six.moves.urllib.error


display = Display()

MANIFEST_FORMAT = 1


class CollectionRequirement:

    _FILE_MAPPING = [(b'MANIFEST.json', 'manifest_file'), (b'FILES.json', 'files_file')]

    def __init__(self, namespace, name, b_path, api, versions, requirement, force, parent=None, metadata=None,
                 files=None, skip=False):
        """
        Represents a collection requirement, the versions that are available to be installed as well as any
        dependencies the collection has.

        :param namespace: The collection namespace.
        :param name: The collection name.
        :param b_path: Byte str of the path to the collection tarball if it has already been downloaded.
        :param api: The GalaxyAPI to use if the collection is from Galaxy.
        :param versions: A list of versions of the collection that are available.
        :param requirement: The version requirement string used to verify the list of versions fit the requirements.
        :param force: Whether the force flag applied to the collection.
        :param parent: The name of the parent the collection is a dependency of.
        :param metadata: The galaxy.api.CollectionVersionMetadata that has already been retrieved from the Galaxy
            server.
        :param files: The files that exist inside the collection. This is based on the FILES.json file inside the
            collection artifact.
        :param skip: Whether to skip installing the collection. Should be set if the collection is already installed
            and force is not set.
        """
        self.namespace = namespace
        self.name = name
        self.b_path = b_path
        self.api = api
        self.versions = set(versions)
        self.force = force
        self.skip = skip
        self.required_by = []

        self._metadata = metadata
        self._files = files

        self.add_requirement(parent, requirement)

    def __str__(self):
        return to_native("%s.%s" % (self.namespace, self.name))

    def __unicode__(self):
        return u"%s.%s" % (self.namespace, self.name)

    @property
    def latest_version(self):
        try:
            return max([v for v in self.versions if v != '*'], key=LooseVersion)
        except ValueError:  # ValueError: max() arg is an empty sequence
            return '*'

    @property
    def dependencies(self):
        if self._metadata:
            return self._metadata.dependencies
        elif len(self.versions) > 1:
            return None

        self._get_metadata()
        return self._metadata.dependencies

    def add_requirement(self, parent, requirement):
        self.required_by.append((parent, requirement))
        new_versions = set(v for v in self.versions if self._meets_requirements(v, requirement, parent))
        if len(new_versions) == 0:
            if self.skip:
                force_flag = '--force-with-deps' if parent else '--force'
                version = self.latest_version if self.latest_version != '*' else 'unknown'
                msg = "Cannot meet requirement %s:%s as it is already installed at version '%s'. Use %s to overwrite" \
                      % (to_text(self), requirement, version, force_flag)
                raise AnsibleError(msg)
            elif parent is None:
                msg = "Cannot meet requirement %s for dependency %s" % (requirement, to_text(self))
            else:
                msg = "Cannot meet dependency requirement '%s:%s' for collection %s" \
                      % (to_text(self), requirement, parent)

            collection_source = to_text(self.b_path, nonstring='passthru') or self.api.api_server
            req_by = "\n".join(
                "\t%s - '%s:%s'" % (to_text(p) if p else 'base', to_text(self), r)
                for p, r in self.required_by
            )

            versions = ", ".join(sorted(self.versions, key=LooseVersion))
            raise AnsibleError(
                "%s from source '%s'. Available versions before last requirement added: %s\nRequirements from:\n%s"
                % (msg, collection_source, versions, req_by)
            )

        self.versions = new_versions

    def install(self, path, b_temp_path):
        if self.skip:
            display.display("Skipping '%s' as it is already installed" % to_text(self))
            return

        # Install if it is not
        collection_path = os.path.join(path, self.namespace, self.name)
        b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
        display.display("Installing '%s:%s' to '%s'" % (to_text(self), self.latest_version, collection_path))

        if self.b_path is None:
            download_url = self._metadata.download_url
            artifact_hash = self._metadata.artifact_sha256
            headers = {}
            self.api._add_auth_token(headers, download_url, required=False)

            self.b_path = _download_file(download_url, b_temp_path, artifact_hash, self.api.validate_certs,
                                         headers=headers)

        if os.path.exists(b_collection_path):
            shutil.rmtree(b_collection_path)
        os.makedirs(b_collection_path)

        with tarfile.open(self.b_path, mode='r') as collection_tar:
            files_member_obj = collection_tar.getmember('FILES.json')
            with _tarfile_extract(collection_tar, files_member_obj) as files_obj:
                files = json.loads(to_text(files_obj.read(), errors='surrogate_or_strict'))

            _extract_tar_file(collection_tar, 'MANIFEST.json', b_collection_path, b_temp_path)
            _extract_tar_file(collection_tar, 'FILES.json', b_collection_path, b_temp_path)

            for file_info in files['files']:
                file_name = file_info['name']
                if file_name == '.':
                    continue

                if file_info['ftype'] == 'file':
                    _extract_tar_file(collection_tar, file_name, b_collection_path, b_temp_path,
                                      expected_hash=file_info['chksum_sha256'])
                else:
                    os.makedirs(os.path.join(b_collection_path, to_bytes(file_name, errors='surrogate_or_strict')))

    def set_latest_version(self):
        self.versions = set([self.latest_version])
        self._get_metadata()

    def _get_metadata(self):
        if self._metadata:
            return
        self._metadata = self.api.get_collection_version_metadata(self.namespace, self.name, self.latest_version)

    def _meets_requirements(self, version, requirements, parent):
        """
        Supports version identifiers can be '==', '!=', '>', '>=', '<', '<=', '*'. Each requirement is delimited by ','
        """
        op_map = {
            '!=': operator.ne,
            '==': operator.eq,
            '=': operator.eq,
            '>=': operator.ge,
            '>': operator.gt,
            '<=': operator.le,
            '<': operator.lt,
        }

        for req in list(requirements.split(',')):
            op_pos = 2 if len(req) > 1 and req[1] == '=' else 1
            op = op_map.get(req[:op_pos])

            requirement = req[op_pos:]
            if not op:
                requirement = req
                op = operator.eq

                # In the case we are checking a new requirement on a base requirement (parent != None) we can't accept
                # version as '*' (unknown version) unless the requirement is also '*'.
                if parent and version == '*' and requirement != '*':
                    break
                elif requirement == '*' or version == '*':
                    continue

            if not op(LooseVersion(version), LooseVersion(requirement)):
                break
        else:
            return True

        # The loop was broken early, it does not meet all the requirements
        return False

    @staticmethod
    def from_tar(b_path, force, parent=None):
        if not tarfile.is_tarfile(b_path):
            raise AnsibleError("Collection artifact at '%s' is not a valid tar file." % to_native(b_path))

        info = {}
        with tarfile.open(b_path, mode='r') as collection_tar:
            for b_member_name, property_name in CollectionRequirement._FILE_MAPPING:
                n_member_name = to_native(b_member_name)
                try:
                    member = collection_tar.getmember(n_member_name)
                except KeyError:
                    raise AnsibleError("Collection at '%s' does not contain the required file %s."
                                       % (to_native(b_path), n_member_name))

                with _tarfile_extract(collection_tar, member) as member_obj:
                    try:
                        info[property_name] = json.loads(to_text(member_obj.read(), errors='surrogate_or_strict'))
                    except ValueError:
                        raise AnsibleError("Collection tar file member %s does not contain a valid json string."
                                           % n_member_name)

        meta = info['manifest_file']['collection_info']
        files = info['files_file']['files']

        namespace = meta['namespace']
        name = meta['name']
        version = meta['version']
        meta = CollectionVersionMetadata(namespace, name, version, None, None, meta['dependencies'])

        return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
                                     metadata=meta, files=files)

    @staticmethod
    def from_path(b_path, force, parent=None):
        info = {}
        for b_file_name, property_name in CollectionRequirement._FILE_MAPPING:
            b_file_path = os.path.join(b_path, b_file_name)
            if not os.path.exists(b_file_path):
                continue

            with open(b_file_path, 'rb') as file_obj:
                try:
                    info[property_name] = json.loads(to_text(file_obj.read(), errors='surrogate_or_strict'))
                except ValueError:
                    raise AnsibleError("Collection file at '%s' does not contain a valid json string."
                                       % to_native(b_file_path))

        if 'manifest_file' in info:
            manifest = info['manifest_file']['collection_info']
            namespace = manifest['namespace']
            name = manifest['name']
            version = manifest['version']
            dependencies = manifest['dependencies']
        else:
            display.warning("Collection at '%s' does not have a MANIFEST.json file, cannot detect version."
                            % to_text(b_path))
            parent_dir, name = os.path.split(to_text(b_path, errors='surrogate_or_strict'))
            namespace = os.path.split(parent_dir)[1]

            version = '*'
            dependencies = {}

        meta = CollectionVersionMetadata(namespace, name, version, None, None, dependencies)

        files = info.get('files_file', {}).get('files', {})

        return CollectionRequirement(namespace, name, b_path, None, [version], version, force, parent=parent,
                                     metadata=meta, files=files, skip=True)

    @staticmethod
    def from_name(collection, apis, requirement, force, parent=None):
        namespace, name = collection.split('.', 1)
        galaxy_meta = None

        for api in apis:
            try:
                if not (requirement == '*' or requirement.startswith('<') or requirement.startswith('>') or
                        requirement.startswith('!=')):
                    if requirement.startswith('='):
                        requirement = requirement.lstrip('=')

                    resp = api.get_collection_version_metadata(namespace, name, requirement)

                    galaxy_meta = resp
                    versions = [resp.version]
                else:
                    resp = api.get_collection_versions(namespace, name)

                    # Galaxy supports semver but ansible-galaxy does not. We ignore any versions that don't match
                    # StrictVersion (x.y.z) and only support pre-releases if an explicit version was set (done above).
                    versions = [v for v in resp if StrictVersion.version_re.match(v)]
            except GalaxyError as err:
                if err.http_code == 404:
                    display.vvv("Collection '%s' is not available from server %s %s"
                                % (collection, api.name, api.api_server))
                    continue
                raise

            display.vvv("Collection '%s' obtained from server %s %s" % (collection, api.name, api.api_server))
            break
        else:
            raise AnsibleError("Failed to find collection %s:%s" % (collection, requirement))

        req = CollectionRequirement(namespace, name, None, api, versions, requirement, force, parent=parent,
                                    metadata=galaxy_meta)
        return req


def build_collection(collection_path, output_path, force):
    """
    Creates the Ansible collection artifact in a .tar.gz file.

    :param collection_path: The path to the collection to build. This should be the directory that contains the
        galaxy.yml file.
    :param output_path: The path to create the collection build artifact. This should be a directory.
    :param force: Whether to overwrite an existing collection build artifact or fail.
    :return: The path to the collection build artifact.
    """
    b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
    b_galaxy_path = os.path.join(b_collection_path, b'galaxy.yml')
    if not os.path.exists(b_galaxy_path):
        raise AnsibleError("The collection galaxy.yml path '%s' does not exist." % to_native(b_galaxy_path))

    collection_meta = _get_galaxy_yml(b_galaxy_path)
    file_manifest = _build_files_manifest(b_collection_path, collection_meta['namespace'], collection_meta['name'])
    collection_manifest = _build_manifest(**collection_meta)

    collection_output = os.path.join(output_path, "%s-%s-%s.tar.gz" % (collection_meta['namespace'],
                                                                       collection_meta['name'],
                                                                       collection_meta['version']))

    b_collection_output = to_bytes(collection_output, errors='surrogate_or_strict')
    if os.path.exists(b_collection_output):
        if os.path.isdir(b_collection_output):
            raise AnsibleError("The output collection artifact '%s' already exists, "
                               "but is a directory - aborting" % to_native(collection_output))
        elif not force:
            raise AnsibleError("The file '%s' already exists. You can use --force to re-create "
                               "the collection artifact." % to_native(collection_output))

    _build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest)


def publish_collection(collection_path, api, wait, timeout):
    """
    Publish an Ansible collection tarball into an Ansible Galaxy server.

    :param collection_path: The path to the collection tarball to publish.
    :param api: A GalaxyAPI to publish the collection to.
    :param wait: Whether to wait until the import process is complete.
    :param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite.
    """
    import_uri = api.publish_collection(collection_path)

    if wait:
        # Galaxy returns a url fragment which differs between v2 and v3.  The second to last entry is
        # always the task_id, though.
        # v2: {"task": "https://galaxy-dev.ansible.com/api/v2/collection-imports/35573/"}
        # v3: {"task": "/api/automation-hub/v3/imports/collections/838d1308-a8f4-402c-95cb-7823f3806cd8/"}
        task_id = None
        for path_segment in reversed(import_uri.split('/')):
            if path_segment:
                task_id = path_segment
                break

        if not task_id:
            raise AnsibleError("Publishing the collection did not return valid task info. Cannot wait for task status. Returned task info: '%s'" % import_uri)

        display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server))
        with _display_progress():
            api.wait_import_task(task_id, timeout)
        display.display("Collection has been successfully published and imported to the Galaxy server %s %s"
                        % (api.name, api.api_server))
    else:
        display.display("Collection has been pushed to the Galaxy server %s %s, not waiting until import has "
                        "completed due to --no-wait being set. Import task results can be found at %s"
                        % (api.name, api.api_server, import_uri))


def install_collections(collections, output_path, apis, validate_certs, ignore_errors, no_deps, force, force_deps):
    """
    Install Ansible collections to the path specified.

    :param collections: The collections to install, should be a list of tuples with (name, requirement, Galaxy server).
    :param output_path: The path to install the collections to.
    :param apis: A list of GalaxyAPIs to query when searching for a collection.
    :param validate_certs: Whether to validate the certificates if downloading a tarball.
    :param ignore_errors: Whether to ignore any errors when installing the collection.
    :param no_deps: Ignore any collection dependencies and only install the base requirements.
    :param force: Re-install a collection if it has already been installed.
    :param force_deps: Re-install a collection as well as its dependencies if they have already been installed.
    """
    existing_collections = _find_existing_collections(output_path)

    with _tempdir() as b_temp_path:
        display.display("Process install dependency map")
        with _display_progress():
            dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis,
                                                   validate_certs, force, force_deps, no_deps)

        display.display("Starting collection install process")
        with _display_progress():
            for collection in dependency_map.values():
                try:
                    collection.install(output_path, b_temp_path)
                except AnsibleError as err:
                    if ignore_errors:
                        display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. "
                                        "Error: %s" % (to_text(collection), to_text(err)))
                    else:
                        raise


def validate_collection_name(name):
    """
    Validates the collection name as an input from the user or a requirements file fit the requirements.

    :param name: The input name with optional range specifier split by ':'.
    :return: The input value, required for argparse validation.
    """
    collection, dummy, dummy = name.partition(':')
    if AnsibleCollectionRef.is_valid_collection_name(collection):
        return name

    raise AnsibleError("Invalid collection name '%s', name must be in the format <namespace>.<collection>." % name)


@contextmanager
def _tempdir():
    b_temp_path = tempfile.mkdtemp(dir=to_bytes(C.DEFAULT_LOCAL_TMP, errors='surrogate_or_strict'))
    yield b_temp_path
    shutil.rmtree(b_temp_path)


@contextmanager
def _tarfile_extract(tar, member):
    tar_obj = tar.extractfile(member)
    yield tar_obj
    tar_obj.close()


@contextmanager
def _display_progress():
    def progress(display_queue, actual_display):
        actual_display.debug("Starting display_progress display thread")
        t = threading.current_thread()

        while True:
            for c in "|/-\\":
                actual_display.display(c + "\b", newline=False)
                time.sleep(0.1)

                # Display a message from the main thread
                while True:
                    try:
                        method, args, kwargs = display_queue.get(block=False, timeout=0.1)
                    except queue.Empty:
                        break
                    else:
                        func = getattr(actual_display, method)
                        func(*args, **kwargs)

                if getattr(t, "finish", False):
                    actual_display.debug("Received end signal for display_progress display thread")
                    return

    class DisplayThread(object):

        def __init__(self, display_queue):
            self.display_queue = display_queue

        def __getattr__(self, attr):
            def call_display(*args, **kwargs):
                self.display_queue.put((attr, args, kwargs))

            return call_display

    # Temporary override the global display class with our own which add the calls to a queue for the thread to call.
    global display
    old_display = display
    try:
        display_queue = queue.Queue()
        display = DisplayThread(display_queue)
        t = threading.Thread(target=progress, args=(display_queue, old_display))
        t.daemon = True
        t.start()

        try:
            yield
        finally:
            t.finish = True
            t.join()
    except Exception:
        # The exception is re-raised so we can sure the thread is finished and not using the display anymore
        raise
    finally:
        display = old_display


def _get_galaxy_yml(b_galaxy_yml_path):
    meta_info = get_collections_galaxy_meta_info()

    mandatory_keys = set()
    string_keys = set()
    list_keys = set()
    dict_keys = set()

    for info in meta_info:
        if info.get('required', False):
            mandatory_keys.add(info['key'])

        key_list_type = {
            'str': string_keys,
            'list': list_keys,
            'dict': dict_keys,
        }[info.get('type', 'str')]
        key_list_type.add(info['key'])

    all_keys = frozenset(list(mandatory_keys) + list(string_keys) + list(list_keys) + list(dict_keys))

    try:
        with open(b_galaxy_yml_path, 'rb') as g_yaml:
            galaxy_yml = yaml.safe_load(g_yaml)
    except YAMLError as err:
        raise AnsibleError("Failed to parse the galaxy.yml at '%s' with the following error:\n%s"
                           % (to_native(b_galaxy_yml_path), to_native(err)))

    set_keys = set(galaxy_yml.keys())
    missing_keys = mandatory_keys.difference(set_keys)
    if missing_keys:
        raise AnsibleError("The collection galaxy.yml at '%s' is missing the following mandatory keys: %s"
                           % (to_native(b_galaxy_yml_path), ", ".join(sorted(missing_keys))))

    extra_keys = set_keys.difference(all_keys)
    if len(extra_keys) > 0:
        display.warning("Found unknown keys in collection galaxy.yml at '%s': %s"
                        % (to_text(b_galaxy_yml_path), ", ".join(extra_keys)))

    # Add the defaults if they have not been set
    for optional_string in string_keys:
        if optional_string not in galaxy_yml:
            galaxy_yml[optional_string] = None

    for optional_list in list_keys:
        list_val = galaxy_yml.get(optional_list, None)

        if list_val is None:
            galaxy_yml[optional_list] = []
        elif not isinstance(list_val, list):
            galaxy_yml[optional_list] = [list_val]

    for optional_dict in dict_keys:
        if optional_dict not in galaxy_yml:
            galaxy_yml[optional_dict] = {}

    # license is a builtin var in Python, to avoid confusion we just rename it to license_ids
    galaxy_yml['license_ids'] = galaxy_yml['license']
    del galaxy_yml['license']

    return galaxy_yml


def _build_files_manifest(b_collection_path, namespace, name):
    # Contains tuple of (b_filename, only root) where 'only root' means to only ignore the file in the root dir
    b_ignore_files = frozenset([(b'*.pyc', False), (b'*.retry', False),
                                (to_bytes('{0}-{1}-*.tar.gz'.format(namespace, name)), True)])
    b_ignore_dirs = frozenset([(b'CVS', False), (b'.bzr', False), (b'.hg', False), (b'.git', False), (b'.svn', False),
                               (b'__pycache__', False), (b'.tox', False)])

    entry_template = {
        'name': None,
        'ftype': None,
        'chksum_type': None,
        'chksum_sha256': None,
        'format': MANIFEST_FORMAT
    }
    manifest = {
        'files': [
            {
                'name': '.',
                'ftype': 'dir',
                'chksum_type': None,
                'chksum_sha256': None,
                'format': MANIFEST_FORMAT,
            },
        ],
        'format': MANIFEST_FORMAT,
    }

    def _walk(b_path, b_top_level_dir):
        is_root = b_path == b_top_level_dir

        for b_item in os.listdir(b_path):
            b_abs_path = os.path.join(b_path, b_item)
            b_rel_base_dir = b'' if b_path == b_top_level_dir else b_path[len(b_top_level_dir) + 1:]
            rel_path = to_text(os.path.join(b_rel_base_dir, b_item), errors='surrogate_or_strict')

            if os.path.isdir(b_abs_path):
                if any(b_item == b_path for b_path, root_only in b_ignore_dirs
                       if not root_only or root_only == is_root):
                    display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
                    continue

                if os.path.islink(b_abs_path):
                    b_link_target = os.path.realpath(b_abs_path)

                    if not b_link_target.startswith(b_top_level_dir):
                        display.warning("Skipping '%s' as it is a symbolic link to a directory outside the collection"
                                        % to_text(b_abs_path))
                        continue

                manifest_entry = entry_template.copy()
                manifest_entry['name'] = rel_path
                manifest_entry['ftype'] = 'dir'

                manifest['files'].append(manifest_entry)

                _walk(b_abs_path, b_top_level_dir)
            else:
                if b_item == b'galaxy.yml':
                    continue
                elif any(fnmatch.fnmatch(b_item, b_pattern) for b_pattern, root_only in b_ignore_files
                         if not root_only or root_only == is_root):
                    display.vvv("Skipping '%s' for collection build" % to_text(b_abs_path))
                    continue

                manifest_entry = entry_template.copy()
                manifest_entry['name'] = rel_path
                manifest_entry['ftype'] = 'file'
                manifest_entry['chksum_type'] = 'sha256'
                manifest_entry['chksum_sha256'] = secure_hash(b_abs_path, hash_func=sha256)

                manifest['files'].append(manifest_entry)

    _walk(b_collection_path, b_collection_path)

    return manifest


def _build_manifest(namespace, name, version, authors, readme, tags, description, license_ids, license_file,
                    dependencies, repository, documentation, homepage, issues, **kwargs):

    manifest = {
        'collection_info': {
            'namespace': namespace,
            'name': name,
            'version': version,
            'authors': authors,
            'readme': readme,
            'tags': tags,
            'description': description,
            'license': license_ids,
            'license_file': license_file if license_file else None,  # Handle galaxy.yml having an empty string (None)
            'dependencies': dependencies,
            'repository': repository,
            'documentation': documentation,
            'homepage': homepage,
            'issues': issues,
        },
        'file_manifest_file': {
            'name': 'FILES.json',
            'ftype': 'file',
            'chksum_type': 'sha256',
            'chksum_sha256': None,  # Filled out in _build_collection_tar
            'format': MANIFEST_FORMAT
        },
        'format': MANIFEST_FORMAT,
    }

    return manifest


def _build_collection_tar(b_collection_path, b_tar_path, collection_manifest, file_manifest):
    files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
    collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
    collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')

    with _tempdir() as b_temp_path:
        b_tar_filepath = os.path.join(b_temp_path, os.path.basename(b_tar_path))

        with tarfile.open(b_tar_filepath, mode='w:gz') as tar_file:
            # Add the MANIFEST.json and FILES.json file to the archive
            for name, b in [('MANIFEST.json', collection_manifest_json), ('FILES.json', files_manifest_json)]:
                b_io = BytesIO(b)
                tar_info = tarfile.TarInfo(name)
                tar_info.size = len(b)
                tar_info.mtime = time.time()
                tar_info.mode = 0o0644
                tar_file.addfile(tarinfo=tar_info, fileobj=b_io)

            for file_info in file_manifest['files']:
                if file_info['name'] == '.':
                    continue

                # arcname expects a native string, cannot be bytes
                filename = to_native(file_info['name'], errors='surrogate_or_strict')
                b_src_path = os.path.join(b_collection_path, to_bytes(filename, errors='surrogate_or_strict'))

                def reset_stat(tarinfo):
                    tarinfo.mode = 0o0755 if tarinfo.isdir() else 0o0644
                    tarinfo.uid = tarinfo.gid = 0
                    tarinfo.uname = tarinfo.gname = ''
                    return tarinfo

                tar_file.add(os.path.realpath(b_src_path), arcname=filename, recursive=False, filter=reset_stat)

        shutil.copy(b_tar_filepath, b_tar_path)
        collection_name = "%s.%s" % (collection_manifest['collection_info']['namespace'],
                                     collection_manifest['collection_info']['name'])
        display.display('Created collection for %s at %s' % (collection_name, to_text(b_tar_path)))


def _find_existing_collections(path):
    collections = []

    b_path = to_bytes(path, errors='surrogate_or_strict')
    for b_namespace in os.listdir(b_path):
        b_namespace_path = os.path.join(b_path, b_namespace)
        if os.path.isfile(b_namespace_path):
            continue

        for b_collection in os.listdir(b_namespace_path):
            b_collection_path = os.path.join(b_namespace_path, b_collection)
            if os.path.isdir(b_collection_path):
                req = CollectionRequirement.from_path(b_collection_path, False)
                display.vvv("Found installed collection %s:%s at '%s'" % (to_text(req), req.latest_version,
                                                                          to_text(b_collection_path)))
                collections.append(req)

    return collections


def _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs, force, force_deps,
                          no_deps):
    dependency_map = {}

    # First build the dependency map on the actual requirements
    for name, version, source in collections:
        _get_collection_info(dependency_map, existing_collections, name, version, source, b_temp_path, apis,
                             validate_certs, (force or force_deps))

    checked_parents = set([to_text(c) for c in dependency_map.values() if c.skip])
    while len(dependency_map) != len(checked_parents):
        while not no_deps:  # Only parse dependencies if no_deps was not set
            parents_to_check = set(dependency_map.keys()).difference(checked_parents)

            deps_exhausted = True
            for parent in parents_to_check:
                parent_info = dependency_map[parent]

                if parent_info.dependencies:
                    deps_exhausted = False
                    for dep_name, dep_requirement in parent_info.dependencies.items():
                        _get_collection_info(dependency_map, existing_collections, dep_name, dep_requirement,
                                             parent_info.api, b_temp_path, apis, validate_certs, force_deps,
                                             parent=parent)

                    checked_parents.add(parent)

            # No extra dependencies were resolved, exit loop
            if deps_exhausted:
                break

        # Now we have resolved the deps to our best extent, now select the latest version for collections with
        # multiple versions found and go from there
        deps_not_checked = set(dependency_map.keys()).difference(checked_parents)
        for collection in deps_not_checked:
            dependency_map[collection].set_latest_version()
            if no_deps or len(dependency_map[collection].dependencies) == 0:
                checked_parents.add(collection)

    return dependency_map


def _get_collection_info(dep_map, existing_collections, collection, requirement, source, b_temp_path, apis,
                         validate_certs, force, parent=None):
    dep_msg = ""
    if parent:
        dep_msg = " - as dependency of %s" % parent
    display.vvv("Processing requirement collection '%s'%s" % (to_text(collection), dep_msg))

    b_tar_path = None
    if os.path.isfile(to_bytes(collection, errors='surrogate_or_strict')):
        display.vvvv("Collection requirement '%s' is a tar artifact" % to_text(collection))
        b_tar_path = to_bytes(collection, errors='surrogate_or_strict')
    elif urlparse(collection).scheme:
        display.vvvv("Collection requirement '%s' is a URL to a tar artifact" % collection)
        b_tar_path = _download_file(collection, b_temp_path, None, validate_certs)

    if b_tar_path:
        req = CollectionRequirement.from_tar(b_tar_path, force, parent=parent)

        collection_name = to_text(req)
        if collection_name in dep_map:
            collection_info = dep_map[collection_name]
            collection_info.add_requirement(None, req.latest_version)
        else:
            collection_info = req
    else:
        validate_collection_name(collection)

        display.vvvv("Collection requirement '%s' is the name of a collection" % collection)
        if collection in dep_map:
            collection_info = dep_map[collection]
            collection_info.add_requirement(parent, requirement)
        else:
            apis = [source] if source else apis
            collection_info = CollectionRequirement.from_name(collection, apis, requirement, force, parent=parent)

    existing = [c for c in existing_collections if to_text(c) == to_text(collection_info)]
    if existing and not collection_info.force:
        # Test that the installed collection fits the requirement
        existing[0].add_requirement(to_text(collection_info), requirement)
        collection_info = existing[0]

    dep_map[to_text(collection_info)] = collection_info


def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
    bufsize = 65536
    digest = sha256()

    urlsplit = os.path.splitext(to_text(url.rsplit('/', 1)[1]))
    b_file_name = to_bytes(urlsplit[0], errors='surrogate_or_strict')
    b_file_ext = to_bytes(urlsplit[1], errors='surrogate_or_strict')
    b_file_path = tempfile.NamedTemporaryFile(dir=b_path, prefix=b_file_name, suffix=b_file_ext, delete=False).name

    display.vvv("Downloading %s to %s" % (url, to_text(b_path)))
    # Galaxy redirs downloads to S3 which reject the request if an Authorization header is attached so don't redir that
    resp = open_url(to_native(url, errors='surrogate_or_strict'), validate_certs=validate_certs, headers=headers,
                    unredirected_headers=['Authorization'])

    with open(b_file_path, 'wb') as download_file:
        data = resp.read(bufsize)
        while data:
            digest.update(data)
            download_file.write(data)
            data = resp.read(bufsize)

    if expected_hash:
        actual_hash = digest.hexdigest()
        display.vvvv("Validating downloaded file hash %s with expected hash %s" % (actual_hash, expected_hash))
        if expected_hash != actual_hash:
            raise AnsibleError("Mismatch artifact hash with downloaded file")

    return b_file_path


def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
    n_filename = to_native(filename, errors='surrogate_or_strict')
    try:
        member = tar.getmember(n_filename)
    except KeyError:
        raise AnsibleError("Collection tar at '%s' does not contain the expected file '%s'." % (to_native(tar.name),
                                                                                                n_filename))

    with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
        bufsize = 65536
        sha256_digest = sha256()
        with _tarfile_extract(tar, member) as tar_obj:
            data = tar_obj.read(bufsize)
            while data:
                tmpfile_obj.write(data)
                tmpfile_obj.flush()
                sha256_digest.update(data)
                data = tar_obj.read(bufsize)

        actual_hash = sha256_digest.hexdigest()

        if expected_hash and actual_hash != expected_hash:
            raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'"
                               % (n_filename, to_native(tar.name)))

        b_dest_filepath = os.path.join(b_dest, to_bytes(filename, errors='surrogate_or_strict'))
        b_parent_dir = os.path.split(b_dest_filepath)[0]
        if not os.path.exists(b_parent_dir):
            # Seems like Galaxy does not validate if all file entries have a corresponding dir ftype entry. This check
            # makes sure we create the parent directory even if it wasn't set in the metadata.
            os.makedirs(b_parent_dir)

        shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)