87815216 |
import threading
from queue import PriorityQueue |
620867de |
import json |
b37b4c63 |
import ThreadPool |
d024e640 |
from constants import constants
from Logger import Logger |
45c9260c |
from SpecData import SPECS |
d024e640 |
class Scheduler(object): |
87815216 |
lock = threading.Lock()
listOfAlreadyBuiltPackages = []
listOfPackagesToBuild = []
listOfPackagesCurrentlyBuilding = []
sortedList = []
listOfPackagesNextToBuild = PriorityQueue()
listOfFailedPackages = [] |
620867de |
alldependencyGraph = {}
dependencyGraph = {}
priorityMap = {} |
87815216 |
pkgWeights = {}
isPriorityScheduler = 1
logger = None
event = None
stopScheduling = False
|
d024e640 |
@staticmethod
def setEvent(event): |
87815216 |
Scheduler.event = event
|
d024e640 |
@staticmethod |
87815216 |
def setLog(logName, logPath):
Scheduler.logger = Logger.getLogger(logName, logPath) |
620867de |
@staticmethod
def getBuildRequiredPackages(package):
listRequiredRPMPackages = [] |
45c9260c |
listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package)) |
620867de |
listRequiredPackages = []
for pkg in listRequiredRPMPackages: |
45c9260c |
basePkg = SPECS.getData().getSpecName(pkg) |
620867de |
if basePkg not in listRequiredPackages:
listRequiredPackages.append(basePkg)
return listRequiredPackages
@staticmethod
def getDependencies(package, parentPackage, k):
|
87815216 |
for node in list(Scheduler.alldependencyGraph[package].keys()):
Scheduler.getDependencies(node, package, k) |
620867de |
|
87815216 |
if parentPackage is None: |
620867de |
return
else:
for node in Scheduler.alldependencyGraph[package].keys():
try:
Scheduler.alldependencyGraph[parentPackage][node] = max(
Scheduler.alldependencyGraph[parentPackage][node],
Scheduler.alldependencyGraph[package][node] * k)
except KeyError:
Scheduler.alldependencyGraph[parentPackage][node] = \
Scheduler.alldependencyGraph[package][node] * k
@staticmethod
def makeGraph():
k = 3
for package in Scheduler.sortedList: |
87815216 |
for child_pkg in list(Scheduler.dependencyGraph[package].keys()): |
620867de |
Scheduler.getDependencies(child_pkg, package, k) |
87815216 |
for node in list(Scheduler.alldependencyGraph[child_pkg].keys()): |
620867de |
try:
Scheduler.dependencyGraph[package][node] = max(
Scheduler.dependencyGraph[package][node],
Scheduler.alldependencyGraph[child_pkg][node] * k)
except KeyError:
Scheduler.dependencyGraph[package][node] = \
Scheduler.alldependencyGraph[child_pkg][node] * k |
87815216 |
if constants.publishBuildDependencies:
dependencyLists = {}
for package in list(Scheduler.dependencyGraph.keys()):
dependencyLists[package] = []
for dependency in list(Scheduler.dependencyGraph[package].keys()):
dependencyLists[package].append(dependency)
graphfile = open(str(constants.logPath) + "/BuildDependencies.json", 'w')
graphfile.write(json.dumps(dependencyLists, sort_keys=True, indent=4))
graphfile.close() |
620867de |
@staticmethod
def parseWeights(): |
87815216 |
Scheduler.pkgWeights.clear()
weightFile = open(constants.packageWeightsPath, 'r')
Scheduler.pkgWeights = json.load(weightFile)
weightFile.close() |
620867de |
@staticmethod
def getWeight(package):
try:
return float(Scheduler.pkgWeights[package])
except KeyError:
return 0
@staticmethod
def setPriorities(): |
87815216 |
if constants.packageWeightsPath is None: |
510e37f4 |
Scheduler.logger.info("Priority Scheduler disabled")
Scheduler.isPriorityScheduler = 0 |
87815216 |
else:
Scheduler.parseWeights() |
510e37f4 |
|
620867de |
for package in Scheduler.sortedList:
Scheduler.dependencyGraph[package] = {}
Scheduler.alldependencyGraph[package] = {}
for child_package in Scheduler.getBuildRequiredPackages(package):
Scheduler.dependencyGraph[package][child_package] = 1
for child_package in Scheduler.getRequiredPackages(package):
Scheduler.alldependencyGraph[package][child_package] = 1
Scheduler.makeGraph()
for package in Scheduler.sortedList:
try:
Scheduler.priorityMap[package] = Scheduler.getWeight(package)
except KeyError:
Scheduler.priorityMap[package] = 0
for child_pkg in Scheduler.dependencyGraph[package].keys():
Scheduler.priorityMap[child_pkg] = Scheduler.priorityMap[child_pkg] \
+ (Scheduler.dependencyGraph[package][child_pkg]
* (Scheduler.getWeight(package)))
Scheduler.logger.info("set Priorities: Priority of all packages")
Scheduler.logger.info(Scheduler.priorityMap)
|
510e37f4 |
|
d024e640 |
@staticmethod |
87815216 |
def setParams(sortedList, listOfAlreadyBuiltPackages):
Scheduler.sortedList = sortedList
Scheduler.listOfAlreadyBuiltPackages = listOfAlreadyBuiltPackages |
d024e640 |
for x in Scheduler.sortedList: |
b5e09fac |
if x not in Scheduler.listOfAlreadyBuiltPackages or x in constants.testForceRPMS: |
d024e640 |
Scheduler.listOfPackagesToBuild.append(x) |
87815216 |
Scheduler.listOfPackagesCurrentlyBuilding = []
Scheduler.listOfPackagesNextToBuild = []
Scheduler.listOfFailedPackages = [] |
510e37f4 |
Scheduler.setPriorities() |
87815216 |
|
d024e640 |
@staticmethod
def getRequiredPackages(package): |
87815216 |
listRequiredRPMPackages = [] |
45c9260c |
listRequiredRPMPackages.extend(SPECS.getData().getBuildRequiresForPackage(package))
listRequiredRPMPackages.extend(SPECS.getData().getRequiresAllForPackage(package)) |
87815216 |
listRequiredPackages = [] |
b5e09fac |
|
d024e640 |
for pkg in listRequiredRPMPackages: |
87815216 |
basePkg = SPECS.getData().getSpecName(pkg) |
d024e640 |
if basePkg not in listRequiredPackages:
listRequiredPackages.append(basePkg) |
87815216 |
|
d024e640 |
return listRequiredPackages |
87815216 |
|
d024e640 |
@staticmethod
def __getListNextPackagesReadyToBuild(): |
87815216 |
listOfPackagesNextToBuild = PriorityQueue() |
d024e640 |
Scheduler.logger.info("Checking for next possible packages to build")
for pkg in Scheduler.listOfPackagesToBuild:
if pkg in Scheduler.listOfPackagesCurrentlyBuilding:
continue |
87815216 |
listRequiredPackages = Scheduler.getRequiredPackages(pkg)
canBuild = True
Scheduler.logger.info("Required packages for " + pkg + " are:") |
d024e640 |
Scheduler.logger.info(listRequiredPackages)
for reqPkg in listRequiredPackages:
if reqPkg not in Scheduler.listOfAlreadyBuiltPackages: |
87815216 |
canBuild = False
Scheduler.logger.info(reqPkg + " is not available. So we cannot build " +
pkg + " at this moment.") |
d024e640 |
break
if canBuild: |
510e37f4 |
listOfPackagesNextToBuild.put((-Scheduler.priorityMap[pkg], pkg)) |
87815216 |
Scheduler.logger.info("Adding " + pkg + " to the schedule list") |
510e37f4 |
return listOfPackagesNextToBuild |
620867de |
|
d024e640 |
@staticmethod
def getNextPackageToBuild(): |
4720acad |
Scheduler.logger.info("Waiting to acquire scheduler lock") |
d024e640 |
Scheduler.lock.acquire() |
620867de |
|
d024e640 |
if Scheduler.stopScheduling: |
4720acad |
Scheduler.logger.info("Released scheduler lock") |
d024e640 |
Scheduler.lock.release()
return None |
620867de |
|
d024e640 |
if len(Scheduler.listOfPackagesToBuild) == 0:
if Scheduler.event is not None:
Scheduler.event.set() |
620867de |
|
510e37f4 |
try:
if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild
except:
if len(Scheduler.listOfPackagesNextToBuild) == 0:
listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild
if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
Scheduler.logger.info("Released scheduler lock")
Scheduler.lock.release()
return None |
620867de |
|
87815216 |
packageTup = Scheduler.listOfPackagesNextToBuild.get() |
510e37f4 |
if packageTup[0] == 0 and Scheduler.isPriorityScheduler == 1:
listOfPackagesNextToBuild = Scheduler.__getListNextPackagesReadyToBuild()
Scheduler.listOfPackagesNextToBuild = listOfPackagesNextToBuild |
87815216 |
if Scheduler.listOfPackagesNextToBuild.qsize() == 0:
Scheduler.logger.info("Released scheduler lock")
Scheduler.lock.release()
return None |
510e37f4 |
packageTup = Scheduler.listOfPackagesNextToBuild.get()
package = packageTup[1]
Scheduler.logger.info("PackagesNextToBuild " + str(packageTup))
if Scheduler.listOfPackagesNextToBuild.qsize() > 0: |
87815216 |
ThreadPool.ThreadPool.activateWorkerThreads(
Scheduler.listOfPackagesNextToBuild.qsize()) |
4720acad |
Scheduler.logger.info("Released scheduler lock") |
d024e640 |
Scheduler.lock.release()
Scheduler.listOfPackagesCurrentlyBuilding.append(package) |
ba177d4a |
Scheduler.listOfPackagesToBuild.remove(package) |
d024e640 |
return package |
87815216 |
|
d024e640 |
#can be synchronized TODO
@staticmethod
def notifyPackageBuildCompleted(package):
if package in Scheduler.listOfPackagesCurrentlyBuilding:
Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
Scheduler.listOfAlreadyBuiltPackages.append(package) |
87815216 |
|
d024e640 |
#can be synchronized TODO
@staticmethod
def notifyPackageBuildFailed(package):
if package in Scheduler.listOfPackagesCurrentlyBuilding:
Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
Scheduler.listOfFailedPackages.append(package) |
87815216 |
|
d024e640 |
@staticmethod
def isAllPackagesBuilt(): |
87815216 |
if len(Scheduler.listOfPackagesToBuild) == 0: |
d024e640 |
return True
return False |
87815216 |
|
d024e640 |
@staticmethod
def isAnyPackagesFailedToBuild():
if len(Scheduler.listOfFailedPackages) != 0:
return True
return False |