87815216 |
import threading
from queue import PriorityQueue |
620867de |
import json |
326d5ca8 |
from ThreadPool import ThreadPool |
d024e640 |
from constants import constants
from Logger import Logger |
45c9260c |
from SpecData import SPECS |
d024e640 |
class Scheduler(object): |
87815216 |
lock = threading.Lock() |
326d5ca8 |
listOfAlreadyBuiltPackages = set() |
87815216 |
listOfPackagesToBuild = [] |
326d5ca8 |
listOfPackagesCurrentlyBuilding = set() |
87815216 |
sortedList = []
listOfPackagesNextToBuild = PriorityQueue()
listOfFailedPackages = [] |
620867de |
alldependencyGraph = {}
dependencyGraph = {}
priorityMap = {} |
87815216 |
pkgWeights = {}
logger = None
event = None
stopScheduling = False
|
d024e640 |
@staticmethod
def setEvent(event): |
87815216 |
Scheduler.event = event
|
d024e640 |
@staticmethod |
87815216 |
def setLog(logName, logPath):
Scheduler.logger = Logger.getLogger(logName, logPath) |
620867de |
@staticmethod |
326d5ca8 |
def setParams(sortedList, listOfAlreadyBuiltPackages):
Scheduler.sortedList = sortedList
Scheduler.listOfAlreadyBuiltPackages = listOfAlreadyBuiltPackages
for x in Scheduler.sortedList:
if x not in Scheduler.listOfAlreadyBuiltPackages or x in constants.testForceRPMS:
Scheduler.listOfPackagesToBuild.append(x)
Scheduler.listOfPackagesCurrentlyBuilding = set()
Scheduler.listOfPackagesNextToBuild = PriorityQueue()
Scheduler.listOfFailedPackages = []
Scheduler._setPriorities()
@staticmethod
def notifyPackageBuildCompleted(package):
with Scheduler.lock:
if package in Scheduler.listOfPackagesCurrentlyBuilding:
Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
Scheduler.listOfAlreadyBuiltPackages.add(package)
@staticmethod
def notifyPackageBuildFailed(package):
with Scheduler.lock:
if package in Scheduler.listOfPackagesCurrentlyBuilding:
Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
Scheduler.listOfFailedPackages.append(package)
@staticmethod
def isAllPackagesBuilt():
if Scheduler.listOfPackagesToBuild:
return False
return True
@staticmethod
def isAnyPackagesFailedToBuild():
if Scheduler.listOfFailedPackages:
return True
return False
@staticmethod
def getNextPackageToBuild():
Scheduler.logger.info("Waiting to acquire scheduler lock")
with Scheduler.lock:
if Scheduler.stopScheduling:
return None
if not Scheduler.listOfPackagesToBuild:
if Scheduler.event is not None:
Scheduler.event.set()
if Scheduler.listOfPackagesNextToBuild.empty():
Scheduler._getListNextPackagesReadyToBuild()
if Scheduler.listOfPackagesNextToBuild.empty():
return None
packageTup = Scheduler.listOfPackagesNextToBuild.get()
package = packageTup[1]
Scheduler.logger.info("PackagesNextToBuild " + str(packageTup))
if Scheduler.listOfPackagesNextToBuild.qsize() > 0:
ThreadPool.activateWorkerThreads(
Scheduler.listOfPackagesNextToBuild.qsize())
Scheduler.listOfPackagesCurrentlyBuilding.add(package)
Scheduler.listOfPackagesToBuild.remove(package)
return package
@staticmethod
def _getBuildRequiredPackages(package): |
620867de |
listRequiredRPMPackages = [] |
45c9260c |
listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package)) |
620867de |
listRequiredPackages = []
for pkg in listRequiredRPMPackages: |
e45f5730 |
basePkg = SPECS.getData().getSpecName(pkg.package) |
620867de |
if basePkg not in listRequiredPackages:
listRequiredPackages.append(basePkg)
return listRequiredPackages
@staticmethod |
326d5ca8 |
def _getDependencies(package, parentPackage, k): |
620867de |
|
87815216 |
for node in list(Scheduler.alldependencyGraph[package].keys()): |
326d5ca8 |
Scheduler._getDependencies(node, package, k) |
620867de |
|
87815216 |
if parentPackage is None: |
620867de |
return
else:
for node in Scheduler.alldependencyGraph[package].keys():
try:
Scheduler.alldependencyGraph[parentPackage][node] = max(
Scheduler.alldependencyGraph[parentPackage][node],
Scheduler.alldependencyGraph[package][node] * k)
except KeyError: |
326d5ca8 |
Scheduler.alldependencyGraph[parentPackage][node] = (
Scheduler.alldependencyGraph[package][node] * k) |
620867de |
@staticmethod |
326d5ca8 |
def _makeGraph():
k = 2 |
620867de |
for package in Scheduler.sortedList: |
87815216 |
for child_pkg in list(Scheduler.dependencyGraph[package].keys()): |
326d5ca8 |
Scheduler._getDependencies(child_pkg, package, k) |
87815216 |
for node in list(Scheduler.alldependencyGraph[child_pkg].keys()): |
620867de |
try:
Scheduler.dependencyGraph[package][node] = max(
Scheduler.dependencyGraph[package][node],
Scheduler.alldependencyGraph[child_pkg][node] * k)
except KeyError: |
326d5ca8 |
Scheduler.dependencyGraph[package][node] = (
Scheduler.alldependencyGraph[child_pkg][node] * k) |
87815216 |
if constants.publishBuildDependencies:
dependencyLists = {}
for package in list(Scheduler.dependencyGraph.keys()):
dependencyLists[package] = []
for dependency in list(Scheduler.dependencyGraph[package].keys()):
dependencyLists[package].append(dependency) |
326d5ca8 |
with open(str(constants.logPath) + "/BuildDependencies.json", 'w') as graphfile:
graphfile.write(json.dumps(dependencyLists, sort_keys=True, indent=4)) |
620867de |
@staticmethod |
326d5ca8 |
def _parseWeights(): |
87815216 |
Scheduler.pkgWeights.clear() |
326d5ca8 |
with open(constants.packageWeightsPath, 'r') as weightFile:
Scheduler.pkgWeights = json.load(weightFile) |
620867de |
@staticmethod |
326d5ca8 |
def _getWeight(package): |
620867de |
try:
return float(Scheduler.pkgWeights[package])
except KeyError:
return 0
@staticmethod |
326d5ca8 |
def _setPriorities(): |
87815216 |
if constants.packageWeightsPath is None: |
510e37f4 |
Scheduler.logger.info("Priority Scheduler disabled") |
87815216 |
else: |
326d5ca8 |
Scheduler.logger.info("Priority Scheduler enabled")
Scheduler._parseWeights()
for package in Scheduler.sortedList:
Scheduler.dependencyGraph[package] = {}
Scheduler.alldependencyGraph[package] = {}
for child_package in Scheduler._getBuildRequiredPackages(package):
Scheduler.dependencyGraph[package][child_package] = 1
for child_package in Scheduler._getRequiredPackages(package):
Scheduler.alldependencyGraph[package][child_package] = 1
Scheduler._makeGraph()
for package in Scheduler.sortedList:
try:
Scheduler.priorityMap[package] = Scheduler._getWeight(package)
except KeyError:
Scheduler.priorityMap[package] = 0
for child_pkg in Scheduler.dependencyGraph[package].keys():
Scheduler.priorityMap[child_pkg] = (
Scheduler.priorityMap[child_pkg]
+ (Scheduler.dependencyGraph[package][child_pkg]
* (Scheduler._getWeight(package))))
Scheduler.logger.info("set Priorities: Priority of all packages")
Scheduler.logger.info(Scheduler.priorityMap) |
620867de |
|
510e37f4 |
|
d024e640 |
@staticmethod |
326d5ca8 |
def _getRequiredPackages(package): |
87815216 |
listRequiredRPMPackages = [] |
45c9260c |
listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package))
listRequiredRPMPackages.extend(SPECS.getData().getRequiresAllForPackage(package)) |
87815216 |
listRequiredPackages = [] |
b5e09fac |
|
d024e640 |
for pkg in listRequiredRPMPackages: |
e45f5730 |
basePkg = SPECS.getData().getSpecName(pkg.package) |
d024e640 |
if basePkg not in listRequiredPackages:
listRequiredPackages.append(basePkg) |
87815216 |
|
d024e640 |
return listRequiredPackages |
87815216 |
|
d024e640 |
@staticmethod |
326d5ca8 |
def _getListNextPackagesReadyToBuild(): |
d024e640 |
Scheduler.logger.info("Checking for next possible packages to build")
for pkg in Scheduler.listOfPackagesToBuild:
if pkg in Scheduler.listOfPackagesCurrentlyBuilding:
continue |
326d5ca8 |
listRequiredPackages = Scheduler._getRequiredPackages(pkg) |
87815216 |
canBuild = True |
d024e640 |
for reqPkg in listRequiredPackages:
if reqPkg not in Scheduler.listOfAlreadyBuiltPackages: |
87815216 |
canBuild = False |
d024e640 |
break
if canBuild: |
326d5ca8 |
Scheduler.listOfPackagesNextToBuild.put((-Scheduler._getWeight(pkg), pkg)) |
87815216 |
Scheduler.logger.info("Adding " + pkg + " to the schedule list") |