Browse code

Changed busywait logic to worker/scheduler thread model

Divya Thaluru authored on 2015/05/30 08:06:01
Showing 4 changed files
... ...
@@ -7,6 +7,10 @@ from PackageBuilder import PackageBuilder
7 7
 import os
8 8
 from PackageUtils import PackageUtils
9 9
 from ToolChainUtils import ToolChainUtils
10
+from Scheduler import Scheduler
11
+from ThreadPool import ThreadPool
12
+from WorkerThread import WorkerThread
13
+import subprocess
10 14
 
11 15
 class PackageManager(object):
12 16
     
... ...
@@ -168,8 +172,84 @@ class PackageManager(object):
168 168
             return False
169 169
         
170 170
         return True
171
-        
171
+    
172
+    def calculatePossibleNumWorkerThreads(self):
173
+        process = subprocess.Popen(["df" ,"/mnt"],shell=True,stdout=subprocess.PIPE)
174
+        retval = process.wait()
175
+        if retval != 0:
176
+            self.logger.error("Unable to check free space. Unknown error.")
177
+            return False
178
+        output = process.communicate()[0]
179
+        device, size, used, available, percent, mountpoint = output.split("\n")[1].split()
180
+        c =  int(available)/600000
181
+        numChroots=int(c)
182
+        self.logger.info("Updated number of chroots:"+str(numChroots))
183
+        return numChroots
184
+    
172 185
     def buildPackages (self, listPackages):
186
+
187
+        if not self.buildToolChain():
188
+            return False
189
+
190
+        returnVal=self.calculateParams(listPackages)
191
+        if not returnVal:
192
+            self.logger.error("Unable to set paramaters. Terminating the package manager.")
193
+            return False
194
+        
195
+        statusEvent=threading.Event()
196
+        
197
+        Scheduler.setLog(self.logName, self.logPath)
198
+        Scheduler.setParams(self.sortedPackageList, self.listOfPackagesAlreadyBuilt)
199
+        Scheduler.setEvent(statusEvent)
200
+        
201
+        numWorkerThreads=self.calculatePossibleNumWorkerThreads()
202
+        if numWorkerThreads == 0:
203
+            return False
204
+         
205
+        ThreadPool.clear()
206
+        i=0
207
+        while i < numWorkerThreads:
208
+            workerName="WorkerThread"+str(i)
209
+            w = WorkerThread(statusEvent,workerName,self.mapPackageToCycle,self.listAvailableCyclicPackages,self.logger)
210
+            ThreadPool.addWorkerThread(workerName, w)
211
+            w.start()
212
+            i = i + 1
213
+        
214
+        statusEvent.wait()
215
+        
216
+        Scheduler.stopScheduling=True
217
+        
218
+        setFailFlag=False
219
+        allPackagesBuilt=False
220
+        if Scheduler.isAnyPackagesFailedToBuild():
221
+            setFailFlag=True
222
+        if Scheduler.isAllPackagesBuilt():
223
+            allPackagesBuilt=True
224
+        
225
+        if setFailFlag:
226
+            self.logger.error("Some of the packages failed:")
227
+            self.logger.error(Scheduler.listOfFailedPackages)
228
+        
229
+        if not setFailFlag:
230
+            if allPackagesBuilt:
231
+                self.logger.info("All packages built successfully")
232
+            else:
233
+                self.logger.error("Build stopped unexpectedly.Unknown error.")
234
+        
235
+        self.logger.info("Waiting for all remaining worker threads")
236
+        listWorkerObjs=ThreadPool.getAllWorkerObjects()
237
+        for w in listWorkerObjs:
238
+            w.join()
239
+        
240
+        self.logger.info("Terminated")
241
+        
242
+        
243
+        
244
+            
245
+        
246
+
247
+    
248
+    def buildPackages1 (self, listPackages):
173 249
         
174 250
         if not self.buildToolChain():
175 251
             return False
176 252
new file mode 100644
... ...
@@ -0,0 +1,127 @@
0
+from ThreadPool import ThreadPool
1
+from constants import constants
2
+from Logger import Logger
3
+import threading 
4
+
5
+class Scheduler(object):
6
+    
7
+    lock=threading.Lock()
8
+    listOfAlreadyBuiltPackages=[]
9
+    listOfPackagesToBuild=[]
10
+    listOfPackagesCurrentlyBuilding=[]
11
+    sortedList=[]
12
+    listOfPackagesNextToBuild=[]
13
+    listOfFailedPackages=[]
14
+    logger=None
15
+    event=None
16
+    stopScheduling=False
17
+    
18
+    @staticmethod
19
+    def setEvent(event):
20
+        Scheduler.event=event
21
+    
22
+    @staticmethod
23
+    def setLog(logName,logPath):
24
+        Scheduler.logger = Logger.getLogger(logName, logPath)    
25
+        
26
+    @staticmethod
27
+    def setParams(sortedList,listOfAlreadyBuiltPackages):
28
+        Scheduler.sortedList=sortedList
29
+        Scheduler.listOfAlreadyBuiltPackages=listOfAlreadyBuiltPackages
30
+        for x in Scheduler.sortedList:
31
+            if x not in Scheduler.listOfAlreadyBuiltPackages:
32
+                Scheduler.listOfPackagesToBuild.append(x)
33
+        Scheduler.listOfPackagesCurrentlyBuilding=[]
34
+        Scheduler.listOfPackagesNextToBuild=[]
35
+        
36
+    @staticmethod
37
+    def getRequiredPackages(package):
38
+        listRequiredRPMPackages=[]
39
+        listRequiredRPMPackages.extend(constants.specData.getBuildRequiresForPackage(package))
40
+        listRequiredRPMPackages.extend(constants.specData.getRequiresAllForPackage(package))
41
+        
42
+        listRequiredPackages=[]
43
+        for pkg in listRequiredRPMPackages:
44
+            basePkg=constants.specData.getSpecName(pkg)
45
+            if basePkg not in listRequiredPackages:
46
+                listRequiredPackages.append(basePkg)
47
+        
48
+        return listRequiredPackages
49
+    
50
+    @staticmethod
51
+    def __getListNextPackagesReadyToBuild():
52
+        listOfPackagesNextToBuild=[]
53
+        Scheduler.logger.info("Checking for next possible packages to build")
54
+        for pkg in Scheduler.listOfPackagesToBuild:
55
+            if pkg in Scheduler.listOfPackagesCurrentlyBuilding:
56
+                continue
57
+            listRequiredPackages=Scheduler.getRequiredPackages(pkg)
58
+            canBuild=True
59
+            Scheduler.logger.info("Required packages for "+ pkg + " are:")
60
+            Scheduler.logger.info(listRequiredPackages)
61
+            for reqPkg in listRequiredPackages:
62
+                if reqPkg not in Scheduler.listOfAlreadyBuiltPackages:
63
+                    canBuild=False
64
+                    Scheduler.logger.info(reqPkg+" is not available. So we cannot build "+ pkg +" at this moment.")
65
+                    break
66
+            if canBuild:
67
+                listOfPackagesNextToBuild.append(pkg)
68
+        return listOfPackagesNextToBuild
69
+    
70
+    @staticmethod
71
+    def getNextPackageToBuild():
72
+        Scheduler.lock.acquire()
73
+        
74
+        if Scheduler.stopScheduling:
75
+            Scheduler.lock.release()
76
+            return None
77
+        
78
+        if len(Scheduler.listOfPackagesToBuild) == 0:
79
+            if Scheduler.event is not None:
80
+                Scheduler.event.set()
81
+            
82
+        if len(Scheduler.listOfPackagesNextToBuild) == 0:
83
+            listOfPackagesNextToBuild=Scheduler.__getListNextPackagesReadyToBuild()
84
+            Scheduler.listOfPackagesNextToBuild=listOfPackagesNextToBuild
85
+            
86
+        if len(Scheduler.listOfPackagesNextToBuild) == 0:
87
+            Scheduler.lock.release()
88
+            return None
89
+        
90
+        package=Scheduler.listOfPackagesNextToBuild.pop(0)
91
+        
92
+        if len(Scheduler.listOfPackagesNextToBuild) > 0:
93
+            ThreadPool.activateWorkerThreads(len(Scheduler.listOfPackagesNextToBuild))
94
+        Scheduler.lock.release()
95
+        Scheduler.listOfPackagesCurrentlyBuilding.append(package)
96
+        Scheduler.listOfPackagesToBuild.remove(package)
97
+        return package
98
+    
99
+    #can be synchronized TODO
100
+    @staticmethod
101
+    def notifyPackageBuildCompleted(package):
102
+        if package in Scheduler.listOfPackagesCurrentlyBuilding:
103
+            Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
104
+            Scheduler.listOfAlreadyBuiltPackages.append(package)
105
+    
106
+        
107
+    #can be synchronized TODO
108
+    @staticmethod
109
+    def notifyPackageBuildFailed(package):
110
+        if package in Scheduler.listOfPackagesCurrentlyBuilding:
111
+            Scheduler.listOfPackagesCurrentlyBuilding.remove(package)
112
+            Scheduler.listOfFailedPackages.append(package)
113
+                
114
+    @staticmethod
115
+    def isAllPackagesBuilt():
116
+        if len(Scheduler.listOfPackagesToBuild) == 0 :
117
+            return True
118
+        return False
119
+    
120
+    @staticmethod
121
+    def isAnyPackagesFailedToBuild():
122
+        if len(Scheduler.listOfFailedPackages) != 0:
123
+            return True
124
+        return False
125
+        
126
+        
0 127
\ No newline at end of file
1 128
new file mode 100644
... ...
@@ -0,0 +1,55 @@
0
+
1
+class ThreadPool(object):
2
+    
3
+    mapWorkerThreads={}
4
+    activeWorkerThreads=[]
5
+    inactiveWorkerThreads=[]
6
+    
7
+    @staticmethod
8
+    def clear():
9
+        ThreadPool.mapWorkerThreads.clear()
10
+        ThreadPool.activeWorkerThreads=[]
11
+        ThreadPool.inactiveWorkerThreads=[]
12
+    
13
+    @staticmethod
14
+    def getAllWorkerObjects():
15
+        listWorkerObjs=[]
16
+        listWorkerKeys = ThreadPool.mapWorkerThreads.keys()
17
+        for x in listWorkerKeys:
18
+            xobj=ThreadPool.mapWorkerThreads[x]
19
+            listWorkerObjs.append(xobj)
20
+        return listWorkerObjs
21
+        
22
+    @staticmethod
23
+    def addWorkerThread(workerThreadName,workerThread):
24
+        ThreadPool.mapWorkerThreads[workerThreadName]=workerThread
25
+   
26
+    @staticmethod
27
+    def makeWorkerThreadActive(threadName):
28
+        if threadName in ThreadPool.inactiveWorkerThreads:
29
+            ThreadPool.inactiveWorkerThreads.remove(threadName)
30
+        ThreadPool.activeWorkerThreads.append(threadName)
31
+        
32
+    @staticmethod
33
+    def makeWorkerThreadInActive(threadName):
34
+        if threadName in ThreadPool.activeWorkerThreads:
35
+            ThreadPool.activeWorkerThreads.remove(threadName)
36
+        ThreadPool.inactiveWorkerThreads.append(threadName)
37
+    
38
+    @staticmethod
39
+    def startWorkerThread(threadName):
40
+        ThreadPool.mapWorkerThreads[threadName].start()
41
+    
42
+    @staticmethod
43
+    def getListInactiveWorkerThreads():
44
+        return ThreadPool.inactiveWorkerThreads
45
+    
46
+    @staticmethod
47
+    def activateWorkerThreads(numOfThreadsToActivate):
48
+        while len(ThreadPool.inactiveWorkerThreads) > 0 and numOfThreadsToActivate > 0:
49
+            threadName=ThreadPool.inactiveWorkerThreads.pop()
50
+            ThreadPool.startWorkerThread(threadName)
51
+            ThreadPool.makeWorkerThreadActive(threadName)
52
+            numOfThreadsToActivate = numOfThreadsToActivate -1
53
+
54
+            
0 55
new file mode 100644
... ...
@@ -0,0 +1,56 @@
0
+from PackageBuilder import PackageBuilder
1
+import threading
2
+from Scheduler import Scheduler
3
+from constants import constants
4
+from ThreadPool import ThreadPool
5
+ 
6
+class WorkerThread(threading.Thread):
7
+    
8
+    def __init__(self,event,name,mapPackageToCycle,listAvailableCyclicPackages,logger):
9
+        threading.Thread.__init__(self)
10
+        self.statusEvent=event
11
+        self.name=name
12
+        self.mapPackageToCycle=mapPackageToCycle
13
+        self.listAvailableCyclicPackages=listAvailableCyclicPackages
14
+        self.logger=logger
15
+    
16
+    
17
+    def run(self):
18
+        buildThreadFailed=False
19
+        ThreadPool.makeWorkerThreadActive(self.name)
20
+        self.logger.info("Thread "+self.name +" is starting now")
21
+        while True:
22
+            outputMap={}
23
+            pkg = Scheduler.getNextPackageToBuild()
24
+            if pkg is None:
25
+                break
26
+            self.logger.info("Thread "+self.name+" is building package:"+ pkg)
27
+            pkgBuilder = PackageBuilder(self.mapPackageToCycle,self.listAvailableCyclicPackages,"build-"+pkg,constants.logPath)
28
+            t = threading.Thread(target=pkgBuilder.buildPackageThreadAPI,args=(pkg,outputMap,pkg))
29
+            t.start()
30
+            t.join()
31
+            if outputMap.has_key(pkg):
32
+                if outputMap[pkg] == False:
33
+                    buildThreadFailed = True
34
+                    Scheduler.notifyPackageBuildFailed(pkg)
35
+                    self.logger.info("Thread "+self.name +" stopped building the "+pkg +" package")
36
+                    break
37
+            else:
38
+                buildThreadFailed = True
39
+                Scheduler.notifyPackageBuildFailed(pkg)
40
+                self.logger.info("Thread "+self.name +" stopped building the "+pkg +" package")
41
+                break
42
+            
43
+            Scheduler.notifyPackageBuildCompleted(pkg)
44
+        
45
+        if buildThreadFailed:
46
+            self.statusEvent.set()
47
+        
48
+        ThreadPool.makeWorkerThreadInActive(self.name)
49
+        self.logger.info("Thread "+self.name +" is going to rest")
50
+        
51
+
52
+
53
+                    
54
+                
55
+        
0 56
\ No newline at end of file