import threading from queue import PriorityQueue import json import ThreadPool from constants import constants from Logger import Logger from SpecData import SPECS class Scheduler(object): lock = threading.Lock() listOfAlreadyBuiltPackages = [] listOfPackagesToBuild = [] listOfPackagesCurrentlyBuilding = [] sortedList = [] listOfPackagesNextToBuild = PriorityQueue() listOfFailedPackages = [] alldependencyGraph = {} dependencyGraph = {} priorityMap = {} pkgWeights = {} isPriorityScheduler = 1 logger = None event = None stopScheduling = False @staticmethod def setEvent(event): Scheduler.event = event @staticmethod def setLog(logName, logPath): Scheduler.logger = Logger.getLogger(logName, logPath) @staticmethod def getBuildRequiredPackages(package): listRequiredRPMPackages = [] listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package)) listRequiredPackages = [] for pkg in listRequiredRPMPackages: basePkg = SPECS.getData().getSpecName(pkg) if basePkg not in listRequiredPackages: listRequiredPackages.append(basePkg) return listRequiredPackages @staticmethod def getDependencies(package, parentPackage, k): for node in list(Scheduler.alldependencyGraph[package].keys()): Scheduler.getDependencies(node, package, k) if parentPackage is None: return else: for node in Scheduler.alldependencyGraph[package].keys(): try: Scheduler.alldependencyGraph[parentPackage][node] = max( Scheduler.alldependencyGraph[parentPackage][node], Scheduler.alldependencyGraph[package][node] * k) except KeyError: Scheduler.alldependencyGraph[parentPackage][node] = \ Scheduler.alldependencyGraph[package][node] * k @staticmethod def makeGraph(): k = 3 for package in Scheduler.sortedList: for child_pkg in list(Scheduler.dependencyGraph[package].keys()): Scheduler.getDependencies(child_pkg, package, k) for node in list(Scheduler.alldependencyGraph[child_pkg].keys()): try: Scheduler.dependencyGraph[package][node] = max( Scheduler.dependencyGraph[package][node], Scheduler.alldependencyGraph[child_pkg][node] * k) except KeyError: Scheduler.dependencyGraph[package][node] = \ Scheduler.alldependencyGraph[child_pkg][node] * k if constants.publishBuildDependencies: dependencyLists = {} for package in list(Scheduler.dependencyGraph.keys()): dependencyLists[package] = [] for dependency in list(Scheduler.dependencyGraph[package].keys()): dependencyLists[package].append(dependency) graphfile = open(str(constants.logPath) + "/BuildDependencies.json", 'w') graphfile.write(json.dumps(dependencyLists, sort_keys=True, indent=4)) graphfile.close() @staticmethod def parseWeights(): Scheduler.pkgWeights.clear() weightFile = open(constants.packageWeightsPath, 'r') Scheduler.pkgWeights = json.load(weightFile) weightFile.close() @staticmethod def getWeight(package): try: return float(Scheduler.pkgWeights[package]) except KeyError: return 0 @staticmethod def setPriorities(): if constants.packageWeightsPath is None: Scheduler.logger.info("Priority Scheduler disabled") Scheduler.isPriorityScheduler = 0 else: Scheduler.parseWeights() for package in Scheduler.sortedList: Scheduler.dependencyGraph[package] = {} Scheduler.alldependencyGraph[package] = {} for child_package in Scheduler.getBuildRequiredPackages(package): Scheduler.dependencyGraph[package][child_package] = 1 for child_package in Scheduler.getRequiredPackages(package): Scheduler.alldependencyGraph[package][child_package] = 1 Scheduler.makeGraph() for package in Scheduler.sortedList: try: Scheduler.priorityMap[package] = Scheduler.getWeight(package) except KeyError: Scheduler.priorityMap[package] = 0 for child_pkg in Scheduler.dependencyGraph[package].keys(): Scheduler.priorityMap[child_pkg] = Scheduler.priorityMap[child_pkg] \ + (Scheduler.dependencyGraph[package][child_pkg] * (Scheduler.getWeight(package))) Scheduler.logger.info("set Priorities: Priority of all packages") Scheduler.logger.info(Scheduler.priorityMap) @staticmethod def setParams(sortedList, listOfAlreadyBuiltPackages): Scheduler.sortedList = sortedList Scheduler.listOfAlreadyBuiltPackages = listOfAlreadyBuiltPackages for x in Scheduler.sortedList: if x not in Scheduler.listOfAlreadyBuiltPackages or x in constants.testForceRPMS: Scheduler.listOfPackagesToBuild.append(x) Scheduler.listOfPackagesCurrentlyBuilding = [] Scheduler.listOfPackagesNextToBuild = [] Scheduler.listOfFailedPackages = [] Scheduler.setPriorities() @staticmethod def getRequiredPackages(package): listRequiredRPMPackages = [] listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package)) listRequiredRPMPackages.extend(SPECS.getData().getRequiresAllForPackage(package)) listRequiredPackages = [] for pkg in listRequiredRPMPackages: basePkg = SPECS.getData().getSpecName(pkg) if basePkg not in listRequiredPackages: listRequiredPackages.append(basePkg) return listRequiredPackages @staticmethod def __getListNextPackagesReadyToBuild(): listOfPackagesNextToBuild = PriorityQueue() Scheduler.logger.info("Checking for next possible packages to build") for pkg in Scheduler.listOfPackagesToBuild: if pkg in Scheduler.listOfPackagesCurrentlyBuilding: continue listRequiredPackages = Scheduler.getRequiredPackages(pkg) canBuild = True Scheduler.logger.info("Required packages for " + pkg + " are:") Scheduler.logger.info(listRequiredPackages) for reqPkg in listRequiredPackages: if reqPkg not in Scheduler.listOfAlreadyBuiltPackages: canBuild = False Scheduler.logger.info(reqPkg + " is not available. So we cannot build " + pkg + " at this moment.") break if canBuild: listOfPackagesNextToBuild.put((-Scheduler.priorityMap[pkg], pkg)) Scheduler.logger.info("Adding " + pkg + " to the schedule list") return listOfPackagesNextToBuild @staticmethod def getNextPackageToBuild(): Scheduler.logger.info("Waiting to acquire scheduler lock") Scheduler.lock.acquire() if Scheduler.stopScheduling: Scheduler.logger.info("Released scheduler lock") Scheduler.lock.release() return None if len(Scheduler.listOfPackagesToBuild) == 0: if Scheduler.event is not None: Scheduler.event.set() try: if Scheduler.listOfPackagesNextToBuild.qsize() == 0: listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild() Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild except: if len(Scheduler.listOfPackagesNextToBuild) == 0: listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild() Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild if Scheduler.listOfPackagesNextToBuild.qsize() == 0: Scheduler.logger.info("Released scheduler lock") Scheduler.lock.release() return None packageTup = Scheduler.listOfPackagesNextToBuild.get() if packageTup[0] == 0 and Scheduler.isPriorityScheduler == 1: listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild() Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild if Scheduler.listOfPackagesNextToBuild.qsize() == 0: Scheduler.logger.info("Released scheduler lock") Scheduler.lock.release() return None packageTup = Scheduler.listOfPackagesNextToBuild.get() package = packageTup[1] Scheduler.logger.info("PackagesNextToBuild " + str(packageTup)) if Scheduler.listOfPackagesNextToBuild.qsize() > 0: ThreadPool.ThreadPool.activateWorkerThreads( Scheduler.listOfPackagesNextToBuild.qsize()) Scheduler.logger.info("Released scheduler lock") Scheduler.lock.release() Scheduler.listOfPackagesCurrentlyBuilding.append(package) Scheduler.listOfPackagesToBuild.remove(package) return package #can be synchronized TODO @staticmethod def notifyPackageBuildCompleted(package): if package in Scheduler.listOfPackagesCurrentlyBuilding: Scheduler.listOfPackagesCurrentlyBuilding.remove(package) Scheduler.listOfAlreadyBuiltPackages.append(package) #can be synchronized TODO @staticmethod def notifyPackageBuildFailed(package): if package in Scheduler.listOfPackagesCurrentlyBuilding: Scheduler.listOfPackagesCurrentlyBuilding.remove(package) Scheduler.listOfFailedPackages.append(package) @staticmethod def isAllPackagesBuilt(): if len(Scheduler.listOfPackagesToBuild) == 0: return True return False @staticmethod def isAnyPackagesFailedToBuild(): if len(Scheduler.listOfFailedPackages) != 0: return True return False