#!/usr/bin/env python3
import os
import yaml
import json
import uuid
import sys
import signal
from argparse import ArgumentParser
from Logger import Logger
from constants import constants
from kubernetes import client, config, watch
from kubernetes import stream
from CommandUtils import CommandUtils
class DistributedBuilder:
def __init__(self, distributedBuildConfig, logName=None, logPath=None):
if logName is None:
logName = "DistributedBuild"
if logPath is None:
logPath = constants.logPath
self.logName = logName
self.logPath = logPath
self.logger = Logger.getLogger(logName, logPath, constants.logLevel)
self.distributedBuildConfig = distributedBuildConfig
self.buildGuid = self.getBuildGuid()
self.aApiClient = config.load_kube_config()
self.coreV1ApiInstance = client.CoreV1Api(self.aApiClient)
self.batchV1ApiInstance = client.BatchV1Api(self.aApiClient)
self.AppsV1ApiInstance = client.AppsV1Api(self.aApiClient)
self.cmdUtils = CommandUtils()
def getBuildGuid(self):
guid = str(uuid.uuid4()).split("-")[1]
guid = guid.lower()
self.logger.info(f"guid: {guid}")
return guid
def createPersistentVolume(self):
with open(
os.path.join(
os.path.dirname(__file__), "yaml/persistentVolume.yaml"
),
"r",
) as f:
for pvFile in yaml.safe_load_all(f):
pvFile["metadata"]["name"] += f"-{self.buildGuid}"
pvFile["metadata"]["labels"][
"storage-tier"
] += f"-{self.buildGuid}"
pvFile["spec"]["nfs"]["server"] = self.distributedBuildConfig[
"nfs-server-ip"
]
if "nfspod" in pvFile["metadata"]["name"]:
pvFile["spec"]["nfs"][
"path"
] = self.distributedBuildConfig["nfs-server-path"]
else:
pvFile["spec"]["nfs"]["path"] = (
self.distributedBuildConfig["nfs-server-path"]
+ f"/build-{self.buildGuid}"
+ pvFile["spec"]["nfs"]["path"]
)
try:
resp = self.coreV1ApiInstance.create_persistent_volume(
body=pvFile
)
self.logger.info(f"Created pv {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling CoreV1Api->create_persistent_volume: {e.reason}\n" # noqa: E501
)
self.clean()
sys.exit(1)
def createPersistentVolumeClaim(self):
with open(
os.path.join(
os.path.dirname(__file__), "yaml/persistentVolumeClaim.yaml"
),
"r",
) as f:
for pvcFile in yaml.safe_load_all(f):
pvcFile["metadata"]["name"] += f"-{self.buildGuid}"
pvcFile["spec"]["selector"]["matchLabels"][
"storage-tier"
] += f"-{self.buildGuid}"
try:
resp = self.coreV1ApiInstance.create_namespaced_persistent_volume_claim( # noqa: E501
namespace="default", body=pvcFile
)
self.logger.info(f"Created pvc {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"CoreV1Api->create_namespaced_persistent_volume_claim:" # noqa: E501
f" {e.reason}\n"
)
self.clean()
sys.exit(1)
def createNfsPod(self):
with open(
os.path.join(os.path.dirname(__file__), "yaml/nfspod.yaml")
) as f:
nfspodFile = yaml.safe_load(f)
nfspodFile["metadata"]["name"] += f"-{self.buildGuid}"
nfspodFile["spec"]["containers"][0][
"workingDir"
] += f"/build-{self.buildGuid}"
nfspodFile["spec"]["volumes"][0]["persistentVolumeClaim"][
"claimName"
] += f"-{self.buildGuid}"
try:
resp = self.coreV1ApiInstance.create_namespaced_pod(
namespace="default", body=nfspodFile
)
self.logger.info(f"Created nfspod {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling CoreV1Api->create_namespaced_pod: " # noqa: E501
f"{e.reason}\n"
)
self.clean()
sys.exit(1)
def createMasterService(self):
with open(
os.path.join(os.path.dirname(__file__), "yaml/masterService.yaml")
) as f:
masterServiceFile = yaml.safe_load(f)
masterServiceFile["metadata"]["name"] += f"-{self.buildGuid}"
masterServiceFile["spec"]["selector"][
"app"
] += f"-{self.buildGuid}"
try:
resp = self.coreV1ApiInstance.create_namespaced_service(
namespace="default", body=masterServiceFile
)
self.logger.info(f"Created pvc {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling CoreV1Api->create_namespaced_service: " # noqa: E501
f"{e.reason}\n"
)
self.clean()
sys.exit(1)
def createMasterJob(self):
with open(
os.path.join(os.path.dirname(__file__), "yaml/master.yaml")
) as f:
masterFile = yaml.safe_load(f)
masterFile["metadata"]["name"] += f"-{self.buildGuid}"
masterFile["spec"]["template"]["metadata"]["labels"][
"app"
] += f"-{self.buildGuid}"
masterFile["spec"]["template"]["spec"]["volumes"][0][
"persistentVolumeClaim"
]["claimName"] += f"-{self.buildGuid}"
tmp_str = masterFile["spec"]["template"]["spec"]["containers"][0][
"args"
][1]
masterFile["spec"]["template"]["spec"]["containers"][0]["args"][
1
] = (f"{tmp_str} && " + self.distributedBuildConfig["command"])
try:
resp = self.batchV1ApiInstance.create_namespaced_job(
namespace="default", body=masterFile
)
self.logger.info(f"Created Job {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling BatchV1Api->create_namespaced_job: " # noqa: E501
f"{e.reason}\n"
)
self.clean()
sys.exit(1)
def createDeployment(self):
with open(
os.path.join(os.path.dirname(__file__), "yaml/worker.yaml")
) as f:
guid = f"-{self.buildGuid}"
workerFile = yaml.safe_load(f)
workerFile["metadata"]["name"] += guid
workerFile["spec"]["template"]["spec"]["containers"][0]["env"][0][
"value"
] = self.buildGuid.upper()
workerFile["spec"]["template"]["spec"]["volumes"][0][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["template"]["spec"]["volumes"][1][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["template"]["spec"]["volumes"][2][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["template"]["spec"]["volumes"][3][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["template"]["spec"]["volumes"][4][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["template"]["spec"]["volumes"][5][
"persistentVolumeClaim"
]["claimName"] += guid
workerFile["spec"]["replicas"] = self.distributedBuildConfig[
"pods"
]
try:
resp = self.AppsV1ApiInstance.create_namespaced_deployment(
body=workerFile, namespace="default"
)
self.logger.info(f"Created deployment {resp.metadata.name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling AppsV1Api->create_namespaced_deployment: " # noqa: E501
f"{e.reason}\n"
)
self.clean()
sys.exit(1)
def deletePersistentVolume(self):
pvNames = [
"builder",
"logs",
"specs",
"rpms",
"publishrpms",
"publishxrpms",
"photon",
"nfspod",
]
for name in pvNames:
try:
self.coreV1ApiInstance.delete_persistent_volume(
f"{name}-{self.buildGuid}"
)
self.logger.info(f"Deleted pv {name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling CoreV1Api->delete_persistent_volume: " # noqa: E501
f"{e.reason}"
)
def deletePersistentVolumeClaim(self):
pvcNames = [
"builder",
"logs",
"specs",
"rpms",
"publishrpms",
"publishxrpms",
"photon",
"nfspod",
]
for name in pvcNames:
try:
self.coreV1ApiInstance.delete_namespaced_persistent_volume_claim( # noqa: E501
f"{name}-{self.buildGuid}", namespace="default"
)
self.logger.info(f"Deleted pvc {name}")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"CoreV1Api->delete_namespaced_persistent_volume_claim: "
f"{e.reason}\n"
)
def deleteMasterJob(self):
try:
job = f"master-{self.buildGuid}"
self.batchV1ApiInstance.delete_namespaced_job(
name=job,
namespace="default",
propagation_policy="Foreground",
grace_period_seconds=10,
)
self.logger.info("deleted job master")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling BatchV1Api->delete_namespaced_job: "
f"{e.reason}"
)
def deleteBuild(self):
self.logger.info("Removing Build folder ...")
pod = f"nfspod-{self.buildGuid}"
cmd = ["/bin/bash", "-c", f"rm -rf /root/build-{self.buildGuid}"]
try:
resp = stream.stream(
self.coreV1ApiInstance.connect_get_namespaced_pod_exec,
pod,
"default",
command=cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False,
)
resp.run_forever(timeout=10)
self.logger.info("Deleted Build folder Successfully...")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"CoreV1Api->connect_namespaced_pod_exec: "
f"{e.reason}"
)
def deleteNfsPod(self):
try:
pod = f"nfspod-{self.buildGuid}"
self.coreV1ApiInstance.delete_namespaced_pod(
name=pod, namespace="default"
)
self.logger.info("deleted nfs pod")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"CoreV1Api->delete_namespaced_pod: "
f"{e.reason}"
)
def deleteMasterService(self):
try:
service = f"master-service-{self.buildGuid}"
self.coreV1ApiInstance.delete_namespaced_service(
name=service, namespace="default"
)
self.logger.info("deleted master service")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"BatchV1Api->delete_namespaced_service "
f"{e.reason}\n"
)
def deleteDeployment(self):
try:
deploy = f"worker-{self.buildGuid}"
self.AppsV1ApiInstance.delete_namespaced_deployment(
name=deploy, namespace="default", grace_period_seconds=15
)
self.logger.info("deleted worker deployment")
except client.rest.ApiException as e:
self.logger.error(
f"Exception when calling "
f"AppsV1Api->delete_namespaced_deployment: "
f"{e.reason}\n"
)
def copyToNfs(self):
podName = f"nfspod-{self.buildGuid}"
while True:
resp = self.coreV1ApiInstance.read_namespaced_pod(
name=podName, namespace="default"
)
status = resp.status.phase
if status == "Running":
break
cmd = "kubectl cp "
cmd += str(
os.path.join(os.path.dirname(__file__)).replace(
"support/package-builder", ""
)
)
cmd += f" {podName}:/root/build-{self.buildGuid}/photon"
self.logger.info(cmd)
self.cmdUtils.runBashCmd(cmd)
def copyFromNfs(self):
podName = f"nfspod-{self.buildGuid}"
while True:
resp = self.coreV1ApiInstance.read_namespaced_pod(
name=podName, namespace="default"
)
status = resp.status.phase
if status == "Running":
break
cmd = (
f"kubectl cp {podName}:/root/build-{self.buildGuid}/photon/stage "
)
cmd += (
str(
os.path.join(os.path.dirname(__file__)).replace(
"support/package-builder", ""
)
)
+ "stage"
)
self.logger.info(cmd)
self.cmdUtils.runBashCmd(cmd)
def monitorJob(self):
w = watch.Watch()
for job in w.stream(
self.batchV1ApiInstance.list_namespaced_job,
namespace="default",
timeout_seconds=21600,
):
if "master" in job["object"].metadata.name:
name = job["object"]
self.logger.info("Checking job status ...")
self.logger.debug(name.status)
if name.status.succeeded or name.status.failed:
self.logger.debug("job status ...")
self.logger.debug(name.status)
break
def getLogs(self):
label = f"app=master-{self.buildGuid}"
resp = self.coreV1ApiInstance.list_namespaced_pod(
label_selector=label, namespace="default"
)
podName = resp.items[0].metadata.name
status = ""
while True:
resp = self.coreV1ApiInstance.read_namespaced_pod(
name=podName, namespace="default"
)
status = resp.status.phase
if status == "Running" or status == "Succeeded":
break
w = watch.Watch()
try:
for line in w.stream(
self.coreV1ApiInstance.read_namespaced_pod_log,
name=podName,
namespace="default",
):
self.logger.info(line)
except Exception as e:
self.logger.error(e)
self.logger.info("pod terminated")
def signal_handler(self, signal, frame):
self.logger.info("SIGINT received")
self.logger.info("Stopping Build ...")
self.clean()
sys.exit(0)
def clean(self):
self.logger.info("-" * 45)
self.logger.info("")
self.logger.info("Cleaning up ...")
self.deleteBuild()
self.deleteNfsPod()
self.deleteMasterJob()
self.deleteMasterService()
self.deleteDeployment()
self.deletePersistentVolumeClaim()
self.deletePersistentVolume()
def create(self):
self.logger.info("-" * 45)
self.logger.info("")
self.createPersistentVolume()
self.createPersistentVolumeClaim()
self.createNfsPod()
self.copyToNfs()
self.createMasterService()
self.createMasterJob()
self.createDeployment()
def main(distributedBuildConfig):
distributedBuilder = DistributedBuilder(distributedBuildConfig)
signal.signal(signal.SIGINT, distributedBuilder.signal_handler)
distributedBuilder.create()
distributedBuilder.getLogs()
distributedBuilder.monitorJob()
distributedBuilder.copyFromNfs()
distributedBuilder.clean()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"-g",
"--distributed-build-option-file",
dest="distributedBuildOptionFile",
default="../../common/data/distributed_build_options.json",
)
parser.add_argument(
"-l", "--log-path", dest="logPath", default="../../stage/LOGS"
)
parser.add_argument("-y", "--log-level", dest="logLevel", default="info")
options = parser.parse_args()
constants.setLogPath(options.logPath)
constants.setLogLevel(options.logLevel)
with open(
os.path.join(os.path.dirname(__file__), options.distributedBuildFile),
"r",
) as configFile:
distributedBuildConfig = json.load(configFile)
main(distributedBuildConfig)