support/package-builder/Scheduler.py
87815216
 import threading
 from queue import PriorityQueue
620867de
 import json
b37b4c63
 import ThreadPool
d024e640
 from constants import constants
 from Logger import Logger
45c9260c
 from SpecData import SPECS
d024e640
 
 class Scheduler(object):
87815216
 
     lock = threading.Lock()
     listOfAlreadyBuiltPackages = []
     listOfPackagesToBuild = []
     listOfPackagesCurrentlyBuilding = []
     sortedList = []
     listOfPackagesNextToBuild = PriorityQueue()
     listOfFailedPackages = []
620867de
     alldependencyGraph = {}
     dependencyGraph = {}
     priorityMap = {}
87815216
     pkgWeights = {}
     isPriorityScheduler = 1
     logger = None
     event = None
     stopScheduling = False
 
d024e640
     @staticmethod
     def setEvent(event):
87815216
         Scheduler.event = event
 
d024e640
     @staticmethod
87815216
     def setLog(logName, logPath):
         Scheduler.logger = Logger.getLogger(logName, logPath)
620867de
 
     @staticmethod
     def getBuildRequiredPackages(package):
         listRequiredRPMPackages = []
45c9260c
         listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package))
620867de
 
         listRequiredPackages = []
 
         for pkg in listRequiredRPMPackages:
45c9260c
             basePkg = SPECS.getData().getSpecName(pkg)
620867de
             if basePkg not in listRequiredPackages:
                 listRequiredPackages.append(basePkg)
 
         return listRequiredPackages
 
 
     @staticmethod
     def getDependencies(package, parentPackage, k):
 
87815216
         for node in list(Scheduler.alldependencyGraph[package].keys()):
             Scheduler.getDependencies(node, package, k)
620867de
 
87815216
         if parentPackage is None:
620867de
             return
         else:
             for node in Scheduler.alldependencyGraph[package].keys():
                 try:
                     Scheduler.alldependencyGraph[parentPackage][node] = max(
                         Scheduler.alldependencyGraph[parentPackage][node],
                         Scheduler.alldependencyGraph[package][node] * k)
                 except KeyError:
                     Scheduler.alldependencyGraph[parentPackage][node] = \
                         Scheduler.alldependencyGraph[package][node] * k
 
     @staticmethod
     def makeGraph():
         k = 3
         for package in Scheduler.sortedList:
87815216
             for child_pkg in list(Scheduler.dependencyGraph[package].keys()):
620867de
                 Scheduler.getDependencies(child_pkg, package, k)
87815216
                 for node in list(Scheduler.alldependencyGraph[child_pkg].keys()):
620867de
                     try:
                         Scheduler.dependencyGraph[package][node] = max(
                             Scheduler.dependencyGraph[package][node],
                             Scheduler.alldependencyGraph[child_pkg][node] * k)
                     except KeyError:
                         Scheduler.dependencyGraph[package][node] = \
                             Scheduler.alldependencyGraph[child_pkg][node] * k
87815216
         if constants.publishBuildDependencies:
             dependencyLists = {}
             for package in list(Scheduler.dependencyGraph.keys()):
                 dependencyLists[package] = []
                 for dependency in list(Scheduler.dependencyGraph[package].keys()):
                     dependencyLists[package].append(dependency)
             graphfile = open(str(constants.logPath) + "/BuildDependencies.json", 'w')
             graphfile.write(json.dumps(dependencyLists, sort_keys=True, indent=4))
             graphfile.close()
620867de
 
     @staticmethod
     def parseWeights():
87815216
         Scheduler.pkgWeights.clear()
         weightFile = open(constants.packageWeightsPath, 'r')
         Scheduler.pkgWeights = json.load(weightFile)
         weightFile.close()
620867de
 
     @staticmethod
     def getWeight(package):
         try:
             return float(Scheduler.pkgWeights[package])
         except KeyError:
             return 0
 
     @staticmethod
     def setPriorities():
87815216
         if constants.packageWeightsPath is None:
510e37f4
             Scheduler.logger.info("Priority Scheduler disabled")
             Scheduler.isPriorityScheduler = 0
87815216
         else:
             Scheduler.parseWeights()
510e37f4
 
620867de
         for package in Scheduler.sortedList:
             Scheduler.dependencyGraph[package] = {}
             Scheduler.alldependencyGraph[package] = {}
             for child_package in Scheduler.getBuildRequiredPackages(package):
                 Scheduler.dependencyGraph[package][child_package] = 1
             for child_package in Scheduler.getRequiredPackages(package):
                 Scheduler.alldependencyGraph[package][child_package] = 1
         Scheduler.makeGraph()
         for package in Scheduler.sortedList:
             try:
                 Scheduler.priorityMap[package] = Scheduler.getWeight(package)
             except KeyError:
                 Scheduler.priorityMap[package] = 0
             for child_pkg in Scheduler.dependencyGraph[package].keys():
                 Scheduler.priorityMap[child_pkg] = Scheduler.priorityMap[child_pkg] \
                                                  + (Scheduler.dependencyGraph[package][child_pkg]
                                                     * (Scheduler.getWeight(package)))
         Scheduler.logger.info("set Priorities: Priority of all packages")
         Scheduler.logger.info(Scheduler.priorityMap)
 
510e37f4
 
d024e640
     @staticmethod
87815216
     def setParams(sortedList, listOfAlreadyBuiltPackages):
         Scheduler.sortedList = sortedList
         Scheduler.listOfAlreadyBuiltPackages = listOfAlreadyBuiltPackages
d024e640
         for x in Scheduler.sortedList:
b5e09fac
             if x not in Scheduler.listOfAlreadyBuiltPackages or x in constants.testForceRPMS:
d024e640
                 Scheduler.listOfPackagesToBuild.append(x)
87815216
         Scheduler.listOfPackagesCurrentlyBuilding = []
         Scheduler.listOfPackagesNextToBuild = []
         Scheduler.listOfFailedPackages = []
510e37f4
         Scheduler.setPriorities()
87815216
 
d024e640
     @staticmethod
     def getRequiredPackages(package):
87815216
         listRequiredRPMPackages = []
45c9260c
         listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package))
         listRequiredRPMPackages.extend(SPECS.getData().getRequiresAllForPackage(package))
87815216
 
         listRequiredPackages = []
b5e09fac
 
d024e640
         for pkg in listRequiredRPMPackages:
87815216
             basePkg = SPECS.getData().getSpecName(pkg)
d024e640
             if basePkg not in listRequiredPackages:
                 listRequiredPackages.append(basePkg)
87815216
 
d024e640
         return listRequiredPackages
87815216
 
d024e640
     @staticmethod
     def __getListNextPackagesReadyToBuild():
87815216
         listOfPackagesNextToBuild = PriorityQueue()
d024e640
         Scheduler.logger.info("Checking for next possible packages to build")
         for pkg in Scheduler.listOfPackagesToBuild:
             if pkg in Scheduler.listOfPackagesCurrentlyBuilding:
                 continue
87815216
             listRequiredPackages = Scheduler.getRequiredPackages(pkg)
             canBuild = True
             Scheduler.logger.info("Required packages for " + pkg + " are:")
d024e640
             Scheduler.logger.info(listRequiredPackages)
             for reqPkg in listRequiredPackages:
                 if reqPkg not in Scheduler.listOfAlreadyBuiltPackages:
87815216
                     canBuild = False
                     Scheduler.logger.info(reqPkg + " is not available. So we cannot build " +
                                           pkg + " at this moment.")
d024e640
                     break
             if canBuild:
510e37f4
                 listOfPackagesNextToBuild.put((-Scheduler.priorityMap[pkg], pkg))
87815216
                 Scheduler.logger.info("Adding " + pkg + " to the schedule list")
510e37f4
         return listOfPackagesNextToBuild
620867de
 
d024e640
     @staticmethod
     def getNextPackageToBuild():
4720acad
         Scheduler.logger.info("Waiting to acquire scheduler lock")
d024e640
         Scheduler.lock.acquire()
620867de
 
d024e640
         if Scheduler.stopScheduling:
4720acad
             Scheduler.logger.info("Released scheduler lock")
d024e640
             Scheduler.lock.release()
             return None
620867de
 
d024e640
         if len(Scheduler.listOfPackagesToBuild) == 0:
             if Scheduler.event is not None:
                 Scheduler.event.set()
620867de
 
510e37f4
         try:
             if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
                 listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
                 Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild
         except:
             if len(Scheduler.listOfPackagesNextToBuild) == 0:
                 listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
                 Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild
 
         if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
             Scheduler.logger.info("Released scheduler lock")
             Scheduler.lock.release()
             return None
620867de
 
87815216
         packageTup = Scheduler.listOfPackagesNextToBuild.get()
510e37f4
 
         if packageTup[0] == 0 and Scheduler.isPriorityScheduler == 1:
             listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
             Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild
87815216
             if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
                 Scheduler.logger.info("Released scheduler lock")
                 Scheduler.lock.release()
                 return None
510e37f4
             packageTup = Scheduler.listOfPackagesNextToBuild.get()
 
         package = packageTup[1]
         Scheduler.logger.info("PackagesNextToBuild " + str(packageTup))
         if Scheduler.listOfPackagesNextToBuild.qsize() > 0:
87815216
             ThreadPool.ThreadPool.activateWorkerThreads(
                 Scheduler.listOfPackagesNextToBuild.qsize())
4720acad
         Scheduler.logger.info("Released scheduler lock")
d024e640
         Scheduler.lock.release()
         Scheduler.listOfPackagesCurrentlyBuilding.append(package)
ba177d4a
         Scheduler.listOfPackagesToBuild.remove(package)
d024e640
         return package
87815216
 
d024e640
     #can be synchronized TODO
     @staticmethod
     def notifyPackageBuildCompleted(package):
         if package in Scheduler.listOfPackagesCurrentlyBuilding:
             Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
             Scheduler.listOfAlreadyBuiltPackages.append(package)
87815216
 
d024e640
     #can be synchronized TODO
     @staticmethod
     def notifyPackageBuildFailed(package):
         if package in Scheduler.listOfPackagesCurrentlyBuilding:
             Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
             Scheduler.listOfFailedPackages.append(package)
87815216
 
d024e640
     @staticmethod
     def isAllPackagesBuilt():
87815216
         if len(Scheduler.listOfPackagesToBuild) == 0:
d024e640
             return True
         return False
87815216
 
d024e640
     @staticmethod
     def isAnyPackagesFailedToBuild():
         if len(Scheduler.listOfFailedPackages) != 0:
             return True
         return False