| ... | ... |
@@ -14,7 +14,7 @@ import ( |
| 14 | 14 |
"github.com/golang/glog" |
| 15 | 15 |
|
| 16 | 16 |
starterrors "github.com/openshift/origin/pkg/bootstrap/docker/errors" |
| 17 |
- "github.com/openshift/origin/pkg/cmd/util/pullprogress" |
|
| 17 |
+ "github.com/openshift/origin/pkg/util/docker/dockerfile/builder/imageprogress" |
|
| 18 | 18 |
) |
| 19 | 19 |
|
| 20 | 20 |
const openShiftInsecureCIDR = "172.30.0.0/16" |
| ... | ... |
@@ -118,31 +118,11 @@ func (h *Helper) CheckAndPull(image string, out io.Writer) error {
|
| 118 | 118 |
} |
| 119 | 119 |
glog.V(5).Infof("Image %q not found. Pulling", image)
|
| 120 | 120 |
fmt.Fprintf(out, "Pulling image %s\n", image) |
| 121 |
- extracting := false |
|
| 122 |
- var outputStream io.Writer |
|
| 123 |
- writeProgress := func(r *pullprogress.ProgressReport) {
|
|
| 124 |
- if extracting {
|
|
| 125 |
- return |
|
| 126 |
- } |
|
| 127 |
- if r.Downloading == 0 && r.Waiting == 0 && r.Extracting > 0 {
|
|
| 128 |
- fmt.Fprintf(out, "Extracting\n") |
|
| 129 |
- extracting = true |
|
| 130 |
- return |
|
| 131 |
- } |
|
| 132 |
- plural := "s" |
|
| 133 |
- if r.Downloading == 1 {
|
|
| 134 |
- plural = " " |
|
| 135 |
- } |
|
| 136 |
- fmt.Fprintf(out, "Downloading %d layer%s (%3.0f%%)", r.Downloading, plural, r.DownloadPct) |
|
| 137 |
- if r.Waiting > 0 {
|
|
| 138 |
- fmt.Fprintf(out, ", %d waiting\n", r.Waiting) |
|
| 139 |
- } else {
|
|
| 140 |
- fmt.Fprintf(out, "\n") |
|
| 141 |
- } |
|
| 121 |
+ logProgress := func(s string) {
|
|
| 122 |
+ fmt.Fprintf(out, "%s\n", s) |
|
| 142 | 123 |
} |
| 143 |
- if !glog.V(5) {
|
|
| 144 |
- outputStream = pullprogress.NewPullProgressWriter(writeProgress) |
|
| 145 |
- } else {
|
|
| 124 |
+ outputStream := imageprogress.NewPullWriter(logProgress) |
|
| 125 |
+ if glog.V(5) {
|
|
| 146 | 126 |
outputStream = out |
| 147 | 127 |
} |
| 148 | 128 |
err = h.client.PullImage(docker.PullImageOptions{
|
| ... | ... |
@@ -153,7 +133,7 @@ func (h *Helper) CheckAndPull(image string, out io.Writer) error {
|
| 153 | 153 |
if err != nil {
|
| 154 | 154 |
return starterrors.NewError("error pulling Docker image %s", image).WithCause(err)
|
| 155 | 155 |
} |
| 156 |
- fmt.Fprintf(out, "Image pull comlete\n") |
|
| 156 |
+ fmt.Fprintf(out, "Image pull complete\n") |
|
| 157 | 157 |
return nil |
| 158 | 158 |
} |
| 159 | 159 |
|
| ... | ... |
@@ -7,13 +7,15 @@ import ( |
| 7 | 7 |
"strings" |
| 8 | 8 |
"time" |
| 9 | 9 |
|
| 10 |
- s2iapi "github.com/openshift/source-to-image/pkg/api" |
|
| 10 |
+ "github.com/docker/docker/pkg/parsers" |
|
| 11 |
+ docker "github.com/fsouza/go-dockerclient" |
|
| 11 | 12 |
"k8s.io/kubernetes/pkg/util/interrupt" |
| 12 | 13 |
utilruntime "k8s.io/kubernetes/pkg/util/runtime" |
| 13 | 14 |
|
| 14 |
- "github.com/docker/docker/pkg/parsers" |
|
| 15 |
- docker "github.com/fsouza/go-dockerclient" |
|
| 15 |
+ s2iapi "github.com/openshift/source-to-image/pkg/api" |
|
| 16 | 16 |
"github.com/openshift/source-to-image/pkg/tar" |
| 17 |
+ |
|
| 18 |
+ "github.com/openshift/origin/pkg/util/docker/dockerfile/builder/imageprogress" |
|
| 17 | 19 |
) |
| 18 | 20 |
|
| 19 | 21 |
var ( |
| ... | ... |
@@ -55,12 +57,18 @@ type DockerClient interface {
|
| 55 | 55 |
// If any other scenario the push will fail, without retries. |
| 56 | 56 |
func pushImage(client DockerClient, name string, authConfig docker.AuthConfiguration) error {
|
| 57 | 57 |
repository, tag := docker.ParseRepositoryTag(name) |
| 58 |
+ logProgress := func(s string) {
|
|
| 59 |
+ glog.V(1).Infof("%s", s)
|
|
| 60 |
+ } |
|
| 58 | 61 |
opts := docker.PushImageOptions{
|
| 59 |
- Name: repository, |
|
| 60 |
- Tag: tag, |
|
| 62 |
+ Name: repository, |
|
| 63 |
+ Tag: tag, |
|
| 64 |
+ OutputStream: imageprogress.NewPushWriter(logProgress), |
|
| 65 |
+ RawJSONStream: true, |
|
| 61 | 66 |
} |
| 62 | 67 |
if glog.Is(5) {
|
| 63 | 68 |
opts.OutputStream = os.Stderr |
| 69 |
+ opts.RawJSONStream = false |
|
| 64 | 70 |
} |
| 65 | 71 |
var err error |
| 66 | 72 |
var retriableError = false |
| ... | ... |
@@ -265,7 +265,7 @@ func (s *S2IBuilder) Build() error {
|
| 265 | 265 |
} else {
|
| 266 | 266 |
glog.V(2).Infof("No push secret provided")
|
| 267 | 267 |
} |
| 268 |
- glog.V(1).Infof("Pushing %s image ...", pushTag)
|
|
| 268 |
+ glog.V(1).Infof("Pushing image %s ...", pushTag)
|
|
| 269 | 269 |
if err := pushImage(s.dockerClient, pushTag, pushAuthConfig); err != nil {
|
| 270 | 270 |
// write extended error message to assist in problem resolution |
| 271 | 271 |
msg := fmt.Sprintf("Failed to push image. Response from registry is: %v", err)
|
| ... | ... |
@@ -281,7 +281,7 @@ func (s *S2IBuilder) Build() error {
|
| 281 | 281 |
} |
| 282 | 282 |
return errors.New(msg) |
| 283 | 283 |
} |
| 284 |
- glog.V(1).Infof("Successfully pushed %s", pushTag)
|
|
| 284 |
+ glog.V(1).Infof("Push successful")
|
|
| 285 | 285 |
} |
| 286 | 286 |
return nil |
| 287 | 287 |
} |
| 288 | 288 |
deleted file mode 100644 |
| ... | ... |
@@ -1,223 +0,0 @@ |
| 1 |
-package pullprogress |
|
| 2 |
- |
|
| 3 |
-import ( |
|
| 4 |
- "encoding/json" |
|
| 5 |
- "io" |
|
| 6 |
- "strings" |
|
| 7 |
- "time" |
|
| 8 |
-) |
|
| 9 |
- |
|
| 10 |
-const ( |
|
| 11 |
- defaultCountTimeThreshhold = 10 * time.Second |
|
| 12 |
- defaultProgressTimeThreshhold = 45 * time.Second |
|
| 13 |
- defaultStableThreshhold = 3 |
|
| 14 |
-) |
|
| 15 |
- |
|
| 16 |
-// NewPullProgressWriter creates a writer that periodically reports |
|
| 17 |
-// on pull progress of an image. It only reports when the state of the |
|
| 18 |
-// different layers has changed and uses time thresholds to limit the |
|
| 19 |
-// rate of the reports. |
|
| 20 |
-func NewPullProgressWriter(reportFn func(*ProgressReport)) io.Writer {
|
|
| 21 |
- pipeIn, pipeOut := io.Pipe() |
|
| 22 |
- progressWriter := &pullProgressWriter{
|
|
| 23 |
- Writer: pipeOut, |
|
| 24 |
- decoder: json.NewDecoder(pipeIn), |
|
| 25 |
- layerStatus: map[string]Status{},
|
|
| 26 |
- reportFn: reportFn, |
|
| 27 |
- countTimeThreshhold: defaultCountTimeThreshhold, |
|
| 28 |
- progressTimeThreshhold: defaultProgressTimeThreshhold, |
|
| 29 |
- stableThreshhold: defaultStableThreshhold, |
|
| 30 |
- } |
|
| 31 |
- go func() {
|
|
| 32 |
- err := progressWriter.readProgress() |
|
| 33 |
- if err != nil {
|
|
| 34 |
- pipeIn.CloseWithError(err) |
|
| 35 |
- } |
|
| 36 |
- }() |
|
| 37 |
- return progressWriter |
|
| 38 |
-} |
|
| 39 |
- |
|
| 40 |
-// Status is a structure representation of a Docker pull progress line |
|
| 41 |
-type Status struct {
|
|
| 42 |
- ID string `json:"id"` |
|
| 43 |
- Status string `json:"status"` |
|
| 44 |
- ProgressDetail Detail `json:"progressDetail"` |
|
| 45 |
- Progress string `json:"progress"` |
|
| 46 |
-} |
|
| 47 |
- |
|
| 48 |
-// Detail is the progressDetail structure in a Docker pull progress line |
|
| 49 |
-type Detail struct {
|
|
| 50 |
- Current int64 `json:"current"` |
|
| 51 |
- Total int64 `json:"total"` |
|
| 52 |
-} |
|
| 53 |
- |
|
| 54 |
-// ProgressReport is a report of the progress of an image pull. It provides counts |
|
| 55 |
-// of layers in a given state. It also provides a percentage of downloaded data |
|
| 56 |
-// of those layers that are currently getting downloaded |
|
| 57 |
-type ProgressReport struct {
|
|
| 58 |
- Waiting int |
|
| 59 |
- Downloading int |
|
| 60 |
- Extracting int |
|
| 61 |
- Complete int |
|
| 62 |
- |
|
| 63 |
- DownloadPct float32 |
|
| 64 |
-} |
|
| 65 |
- |
|
| 66 |
-type statusType int |
|
| 67 |
- |
|
| 68 |
-const ( |
|
| 69 |
- statusWaiting statusType = iota |
|
| 70 |
- statusDownloading |
|
| 71 |
- statusExtracting |
|
| 72 |
- statusComplete |
|
| 73 |
-) |
|
| 74 |
- |
|
| 75 |
-type pullProgressWriter struct {
|
|
| 76 |
- io.Writer |
|
| 77 |
- decoder *json.Decoder |
|
| 78 |
- layerStatus map[string]Status |
|
| 79 |
- lastLayerCount int |
|
| 80 |
- stableLines int |
|
| 81 |
- stableThreshhold int |
|
| 82 |
- countTimeThreshhold time.Duration |
|
| 83 |
- progressTimeThreshhold time.Duration |
|
| 84 |
- lastReport *ProgressReport |
|
| 85 |
- lastReportTime time.Time |
|
| 86 |
- reportFn func(*ProgressReport) |
|
| 87 |
-} |
|
| 88 |
- |
|
| 89 |
-func (w *pullProgressWriter) readProgress() error {
|
|
| 90 |
- for {
|
|
| 91 |
- status := &Status{}
|
|
| 92 |
- err := w.decoder.Decode(status) |
|
| 93 |
- if err == io.EOF {
|
|
| 94 |
- break |
|
| 95 |
- } |
|
| 96 |
- if err != nil {
|
|
| 97 |
- return err |
|
| 98 |
- } |
|
| 99 |
- err = w.processStatus(status) |
|
| 100 |
- if err != nil {
|
|
| 101 |
- return err |
|
| 102 |
- } |
|
| 103 |
- } |
|
| 104 |
- return nil |
|
| 105 |
-} |
|
| 106 |
- |
|
| 107 |
-func (w *pullProgressWriter) isStableLayerCount() bool {
|
|
| 108 |
- // If the number of layers has changed since last status, we're not stable |
|
| 109 |
- if w.lastLayerCount != len(w.layerStatus) {
|
|
| 110 |
- w.lastLayerCount = len(w.layerStatus) |
|
| 111 |
- w.stableLines = 0 |
|
| 112 |
- return false |
|
| 113 |
- } |
|
| 114 |
- // Only proceed after we've received status for the same number |
|
| 115 |
- // of layers at least 3 times. If not, they're still increasing |
|
| 116 |
- w.stableLines++ |
|
| 117 |
- if w.stableLines < w.stableThreshhold {
|
|
| 118 |
- // We're not stable enough yet |
|
| 119 |
- return false |
|
| 120 |
- } |
|
| 121 |
- |
|
| 122 |
- return true |
|
| 123 |
-} |
|
| 124 |
- |
|
| 125 |
-func (w *pullProgressWriter) processStatus(status *Status) error {
|
|
| 126 |
- // determine if it's a status we want to process |
|
| 127 |
- if !isLayerStatus(status) {
|
|
| 128 |
- return nil |
|
| 129 |
- } |
|
| 130 |
- |
|
| 131 |
- w.layerStatus[status.ID] = *status |
|
| 132 |
- |
|
| 133 |
- // if the number of layers has not stabilized yet, return and wait for more |
|
| 134 |
- // progress |
|
| 135 |
- if !w.isStableLayerCount() {
|
|
| 136 |
- return nil |
|
| 137 |
- } |
|
| 138 |
- |
|
| 139 |
- report := createProgressReport(w.layerStatus) |
|
| 140 |
- |
|
| 141 |
- // check if the count of layers in each state has changed |
|
| 142 |
- if countsChanged(report, w.lastReport) {
|
|
| 143 |
- // only report on changed counts if the change occurs after |
|
| 144 |
- // a predefined set of seconds (10 sec by default). This prevents |
|
| 145 |
- // multiple reports in rapid succession |
|
| 146 |
- if time.Since(w.lastReportTime) > w.countTimeThreshhold {
|
|
| 147 |
- w.lastReport = report |
|
| 148 |
- w.lastReportTime = time.Now() |
|
| 149 |
- w.reportFn(report) |
|
| 150 |
- } |
|
| 151 |
- } else {
|
|
| 152 |
- // If counts haven't changed, but enough time has passed (45 sec by default), |
|
| 153 |
- // at least report on download progress |
|
| 154 |
- if time.Since(w.lastReportTime) > w.progressTimeThreshhold {
|
|
| 155 |
- w.lastReport = report |
|
| 156 |
- w.lastReportTime = time.Now() |
|
| 157 |
- w.reportFn(report) |
|
| 158 |
- } |
|
| 159 |
- } |
|
| 160 |
- return nil |
|
| 161 |
-} |
|
| 162 |
- |
|
| 163 |
-func countsChanged(new, old *ProgressReport) bool {
|
|
| 164 |
- if old == nil {
|
|
| 165 |
- return true |
|
| 166 |
- } |
|
| 167 |
- return new.Waiting != old.Waiting || |
|
| 168 |
- new.Downloading != old.Downloading || |
|
| 169 |
- new.Extracting != old.Extracting || |
|
| 170 |
- new.Complete != old.Complete |
|
| 171 |
-} |
|
| 172 |
- |
|
| 173 |
-func layerStatusToPullStatus(str string) statusType {
|
|
| 174 |
- switch str {
|
|
| 175 |
- case "Downloading": |
|
| 176 |
- return statusDownloading |
|
| 177 |
- case "Extracting", "Verifying Checksum", "Download complete": |
|
| 178 |
- return statusExtracting |
|
| 179 |
- case "Pull complete", "Already exists": |
|
| 180 |
- return statusComplete |
|
| 181 |
- default: // "Pull fs layer" or "Waiting" |
|
| 182 |
- return statusWaiting |
|
| 183 |
- } |
|
| 184 |
-} |
|
| 185 |
- |
|
| 186 |
-func isLayerStatus(status *Status) bool {
|
|
| 187 |
- // ignore status lines with no layer id |
|
| 188 |
- if len(status.ID) == 0 {
|
|
| 189 |
- return false |
|
| 190 |
- } |
|
| 191 |
- // ignore status lines with the initial named layer |
|
| 192 |
- if strings.HasPrefix(status.Status, "Pulling from") {
|
|
| 193 |
- return false |
|
| 194 |
- } |
|
| 195 |
- |
|
| 196 |
- return true |
|
| 197 |
-} |
|
| 198 |
- |
|
| 199 |
-func createProgressReport(layerStatus map[string]Status) *ProgressReport {
|
|
| 200 |
- report := &ProgressReport{}
|
|
| 201 |
- var totalDownload, totalCurrent int64 |
|
| 202 |
- for _, status := range layerStatus {
|
|
| 203 |
- pullStatus := layerStatusToPullStatus(status.Status) |
|
| 204 |
- switch pullStatus {
|
|
| 205 |
- case statusWaiting: |
|
| 206 |
- report.Waiting++ |
|
| 207 |
- case statusDownloading: |
|
| 208 |
- report.Downloading++ |
|
| 209 |
- totalDownload += status.ProgressDetail.Total |
|
| 210 |
- totalCurrent += status.ProgressDetail.Current |
|
| 211 |
- case statusExtracting: |
|
| 212 |
- report.Extracting++ |
|
| 213 |
- case statusComplete: |
|
| 214 |
- report.Complete++ |
|
| 215 |
- } |
|
| 216 |
- } |
|
| 217 |
- if totalDownload == 0 {
|
|
| 218 |
- report.DownloadPct = 0 |
|
| 219 |
- } else {
|
|
| 220 |
- report.DownloadPct = float32(totalCurrent) / float32(totalDownload) * 100.0 |
|
| 221 |
- } |
|
| 222 |
- return report |
|
| 223 |
-} |
| ... | ... |
@@ -17,6 +17,8 @@ import ( |
| 17 | 17 |
"github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/archive" |
| 18 | 18 |
"github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/fileutils" |
| 19 | 19 |
"github.com/golang/glog" |
| 20 |
+ |
|
| 21 |
+ "github.com/openshift/origin/pkg/util/docker/dockerfile/builder/imageprogress" |
|
| 20 | 22 |
) |
| 21 | 23 |
|
| 22 | 24 |
// ClientExecutor can run Docker builds from a Docker client. |
| ... | ... |
@@ -298,10 +300,22 @@ func (e *ClientExecutor) LoadImage(from string) (*docker.Image, error) {
|
| 298 | 298 |
} |
| 299 | 299 |
|
| 300 | 300 |
var lastErr error |
| 301 |
+ outputProgress := func(s string) {
|
|
| 302 |
+ e.LogFn("%s", s)
|
|
| 303 |
+ } |
|
| 301 | 304 |
for _, config := range auth {
|
| 302 | 305 |
// TODO: handle IDs? |
| 303 |
- // TODO: use RawJSONStream:true and handle the output nicely |
|
| 304 |
- if err = e.Client.PullImage(docker.PullImageOptions{Repository: from, OutputStream: e.Out, Tag: tag}, config); err == nil {
|
|
| 306 |
+ pullImageOptions := docker.PullImageOptions{
|
|
| 307 |
+ Repository: from, |
|
| 308 |
+ Tag: tag, |
|
| 309 |
+ OutputStream: imageprogress.NewPullWriter(outputProgress), |
|
| 310 |
+ RawJSONStream: true, |
|
| 311 |
+ } |
|
| 312 |
+ if glog.V(5) {
|
|
| 313 |
+ pullImageOptions.OutputStream = os.Stderr |
|
| 314 |
+ pullImageOptions.RawJSONStream = false |
|
| 315 |
+ } |
|
| 316 |
+ if err = e.Client.PullImage(pullImageOptions, config); err == nil {
|
|
| 305 | 317 |
break |
| 306 | 318 |
} |
| 307 | 319 |
lastErr = err |
| 308 | 320 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,234 @@ |
| 0 |
+package imageprogress |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "encoding/json" |
|
| 4 |
+ "io" |
|
| 5 |
+ "regexp" |
|
| 6 |
+ "time" |
|
| 7 |
+) |
|
| 8 |
+ |
|
| 9 |
+const ( |
|
| 10 |
+ defaultProgressTimeThreshhold = 30 * time.Second |
|
| 11 |
+ defaultStableThreshhold = 10 |
|
| 12 |
+) |
|
| 13 |
+ |
|
| 14 |
+// progressLine is a structure representation of a Docker pull progress line |
|
| 15 |
+type progressLine struct {
|
|
| 16 |
+ ID string `json:"id"` |
|
| 17 |
+ Status string `json:"status"` |
|
| 18 |
+ Detail *progressDetail `json:"progressDetail"` |
|
| 19 |
+} |
|
| 20 |
+ |
|
| 21 |
+// progressDetail is the progressDetail structure in a Docker pull progress line |
|
| 22 |
+type progressDetail struct {
|
|
| 23 |
+ Current int64 `json:"current"` |
|
| 24 |
+ Total int64 `json:"total"` |
|
| 25 |
+} |
|
| 26 |
+ |
|
| 27 |
+// layerDetail is layer information associated with a specific layerStatus |
|
| 28 |
+type layerDetail struct {
|
|
| 29 |
+ Count int |
|
| 30 |
+ Current int64 |
|
| 31 |
+ Total int64 |
|
| 32 |
+} |
|
| 33 |
+ |
|
| 34 |
+// layerStatus is one of different possible status for layers detected by |
|
| 35 |
+// the ProgressWriter |
|
| 36 |
+type layerStatus int |
|
| 37 |
+ |
|
| 38 |
+const ( |
|
| 39 |
+ statusPending layerStatus = iota |
|
| 40 |
+ statusDownloading |
|
| 41 |
+ statusExtracting |
|
| 42 |
+ statusComplete |
|
| 43 |
+ statusPushing |
|
| 44 |
+) |
|
| 45 |
+ |
|
| 46 |
+// layerStatusFromDockerString translates a string in a Docker status |
|
| 47 |
+// line to a layerStatus |
|
| 48 |
+func layerStatusFromDockerString(dockerStatus string) layerStatus {
|
|
| 49 |
+ switch dockerStatus {
|
|
| 50 |
+ case "Pushing": |
|
| 51 |
+ return statusPushing |
|
| 52 |
+ case "Downloading": |
|
| 53 |
+ return statusDownloading |
|
| 54 |
+ case "Extracting", "Verifying Checksum", "Download complete": |
|
| 55 |
+ return statusExtracting |
|
| 56 |
+ case "Pull complete", "Already exists", "Pushed": |
|
| 57 |
+ return statusComplete |
|
| 58 |
+ default: |
|
| 59 |
+ return statusPending |
|
| 60 |
+ } |
|
| 61 |
+} |
|
| 62 |
+ |
|
| 63 |
+type report map[layerStatus]*layerDetail |
|
| 64 |
+ |
|
| 65 |
+func (r report) count(status layerStatus) int {
|
|
| 66 |
+ detail, ok := r[status] |
|
| 67 |
+ if !ok {
|
|
| 68 |
+ return 0 |
|
| 69 |
+ } |
|
| 70 |
+ return detail.Count |
|
| 71 |
+} |
|
| 72 |
+ |
|
| 73 |
+func (r report) percentProgress(status layerStatus) float32 {
|
|
| 74 |
+ detail, ok := r[status] |
|
| 75 |
+ if !ok {
|
|
| 76 |
+ return 0 |
|
| 77 |
+ } |
|
| 78 |
+ if detail.Total == 0 {
|
|
| 79 |
+ return 0 |
|
| 80 |
+ } |
|
| 81 |
+ pct := float32(detail.Current) / float32(detail.Total) * 100.0 |
|
| 82 |
+ if pct > 100.0 {
|
|
| 83 |
+ pct = 100.0 |
|
| 84 |
+ } |
|
| 85 |
+ return pct |
|
| 86 |
+} |
|
| 87 |
+ |
|
| 88 |
+func (r report) totalCount() int {
|
|
| 89 |
+ cnt := 0 |
|
| 90 |
+ for _, detail := range r {
|
|
| 91 |
+ cnt += detail.Count |
|
| 92 |
+ } |
|
| 93 |
+ return cnt |
|
| 94 |
+} |
|
| 95 |
+ |
|
| 96 |
+// newWriter creates a writer that periodically reports |
|
| 97 |
+// on pull/push progress of a Docker image. It only reports when the state of the |
|
| 98 |
+// different layers has changed and uses time thresholds to limit the |
|
| 99 |
+// rate of the reports. |
|
| 100 |
+func newWriter(reportFn func(report), layersChangedFn func(report, report) bool) io.Writer {
|
|
| 101 |
+ pipeIn, pipeOut := io.Pipe() |
|
| 102 |
+ writer := &imageProgressWriter{
|
|
| 103 |
+ Writer: pipeOut, |
|
| 104 |
+ decoder: json.NewDecoder(pipeIn), |
|
| 105 |
+ layerStatus: map[string]progressLine{},
|
|
| 106 |
+ reportFn: reportFn, |
|
| 107 |
+ layersChangedFn: layersChangedFn, |
|
| 108 |
+ progressTimeThreshhold: defaultProgressTimeThreshhold, |
|
| 109 |
+ stableThreshhold: defaultStableThreshhold, |
|
| 110 |
+ } |
|
| 111 |
+ go func() {
|
|
| 112 |
+ err := writer.readProgress() |
|
| 113 |
+ if err != nil {
|
|
| 114 |
+ pipeIn.CloseWithError(err) |
|
| 115 |
+ } |
|
| 116 |
+ }() |
|
| 117 |
+ return writer |
|
| 118 |
+} |
|
| 119 |
+ |
|
| 120 |
+type imageProgressWriter struct {
|
|
| 121 |
+ io.Writer |
|
| 122 |
+ decoder *json.Decoder |
|
| 123 |
+ layerStatus map[string]progressLine |
|
| 124 |
+ lastLayerCount int |
|
| 125 |
+ stableLines int |
|
| 126 |
+ stableThreshhold int |
|
| 127 |
+ progressTimeThreshhold time.Duration |
|
| 128 |
+ lastReport report |
|
| 129 |
+ lastReportTime time.Time |
|
| 130 |
+ reportFn func(report) |
|
| 131 |
+ layersChangedFn func(report, report) bool |
|
| 132 |
+} |
|
| 133 |
+ |
|
| 134 |
+func (w *imageProgressWriter) readProgress() error {
|
|
| 135 |
+ for {
|
|
| 136 |
+ line := &progressLine{}
|
|
| 137 |
+ err := w.decoder.Decode(line) |
|
| 138 |
+ if err == io.EOF {
|
|
| 139 |
+ break |
|
| 140 |
+ } |
|
| 141 |
+ if err != nil {
|
|
| 142 |
+ return err |
|
| 143 |
+ } |
|
| 144 |
+ err = w.processLine(line) |
|
| 145 |
+ if err != nil {
|
|
| 146 |
+ return err |
|
| 147 |
+ } |
|
| 148 |
+ } |
|
| 149 |
+ return nil |
|
| 150 |
+} |
|
| 151 |
+ |
|
| 152 |
+func (w *imageProgressWriter) processLine(line *progressLine) error {
|
|
| 153 |
+ // determine if it's a line we want to process |
|
| 154 |
+ if !islayerStatus(line) {
|
|
| 155 |
+ return nil |
|
| 156 |
+ } |
|
| 157 |
+ |
|
| 158 |
+ w.layerStatus[line.ID] = *line |
|
| 159 |
+ |
|
| 160 |
+ // if the number of layers has not stabilized yet, return and wait for more |
|
| 161 |
+ // progress |
|
| 162 |
+ if !w.isStableLayerCount() {
|
|
| 163 |
+ return nil |
|
| 164 |
+ } |
|
| 165 |
+ |
|
| 166 |
+ r := createReport(w.layerStatus) |
|
| 167 |
+ |
|
| 168 |
+ // check if the count of layers in each state has changed |
|
| 169 |
+ if w.layersChangedFn(w.lastReport, r) {
|
|
| 170 |
+ w.lastReport = r |
|
| 171 |
+ w.lastReportTime = time.Now() |
|
| 172 |
+ w.reportFn(r) |
|
| 173 |
+ return nil |
|
| 174 |
+ } |
|
| 175 |
+ // If layer counts haven't changed, but enough time has passed (30 sec by default), |
|
| 176 |
+ // at least report on download/push progress |
|
| 177 |
+ if time.Since(w.lastReportTime) > w.progressTimeThreshhold {
|
|
| 178 |
+ w.lastReport = r |
|
| 179 |
+ w.lastReportTime = time.Now() |
|
| 180 |
+ w.reportFn(r) |
|
| 181 |
+ } |
|
| 182 |
+ return nil |
|
| 183 |
+} |
|
| 184 |
+ |
|
| 185 |
+func (w *imageProgressWriter) isStableLayerCount() bool {
|
|
| 186 |
+ // If the number of layers has changed since last status, we're not stable |
|
| 187 |
+ if w.lastLayerCount != len(w.layerStatus) {
|
|
| 188 |
+ w.lastLayerCount = len(w.layerStatus) |
|
| 189 |
+ w.stableLines = 0 |
|
| 190 |
+ return false |
|
| 191 |
+ } |
|
| 192 |
+ // Only proceed after we've received status for the same number |
|
| 193 |
+ // of layers at least stableThreshhold times. If not, they're still increasing |
|
| 194 |
+ w.stableLines++ |
|
| 195 |
+ if w.stableLines < w.stableThreshhold {
|
|
| 196 |
+ // We're not stable enough yet |
|
| 197 |
+ return false |
|
| 198 |
+ } |
|
| 199 |
+ |
|
| 200 |
+ return true |
|
| 201 |
+} |
|
| 202 |
+ |
|
| 203 |
+var layerIDRegexp = regexp.MustCompile("^[a-f,0-9]*$")
|
|
| 204 |
+ |
|
| 205 |
+func islayerStatus(line *progressLine) bool {
|
|
| 206 |
+ // ignore status lines with no layer id |
|
| 207 |
+ if len(line.ID) == 0 {
|
|
| 208 |
+ return false |
|
| 209 |
+ } |
|
| 210 |
+ // ignore layer ids that are not hex string |
|
| 211 |
+ if !layerIDRegexp.MatchString(line.ID) {
|
|
| 212 |
+ return false |
|
| 213 |
+ } |
|
| 214 |
+ return true |
|
| 215 |
+} |
|
| 216 |
+ |
|
| 217 |
+func createReport(dockerProgress map[string]progressLine) report {
|
|
| 218 |
+ r := report{}
|
|
| 219 |
+ for _, line := range dockerProgress {
|
|
| 220 |
+ layerStatus := layerStatusFromDockerString(line.Status) |
|
| 221 |
+ detail, exists := r[layerStatus] |
|
| 222 |
+ if !exists {
|
|
| 223 |
+ detail = &layerDetail{}
|
|
| 224 |
+ r[layerStatus] = detail |
|
| 225 |
+ } |
|
| 226 |
+ detail.Count++ |
|
| 227 |
+ if line.Detail != nil {
|
|
| 228 |
+ detail.Current += line.Detail.Current |
|
| 229 |
+ detail.Total += line.Detail.Total |
|
| 230 |
+ } |
|
| 231 |
+ } |
|
| 232 |
+ return r |
|
| 233 |
+} |
| 0 | 234 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,45 @@ |
| 0 |
+package imageprogress |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "fmt" |
|
| 4 |
+ "io" |
|
| 5 |
+) |
|
| 6 |
+ |
|
| 7 |
+// NewPullWriter creates a writer that periodically reports |
|
| 8 |
+// on pull progress of a Docker image. It only reports when the state of the |
|
| 9 |
+// different layers has changed and uses time thresholds to limit the |
|
| 10 |
+// rate of the reports. |
|
| 11 |
+func NewPullWriter(printFn func(string)) io.Writer {
|
|
| 12 |
+ return newWriter(pullReporter(printFn), pullLayersChanged) |
|
| 13 |
+} |
|
| 14 |
+ |
|
| 15 |
+func pullReporter(printFn func(string)) func(report) {
|
|
| 16 |
+ extracting := false |
|
| 17 |
+ return func(r report) {
|
|
| 18 |
+ if extracting {
|
|
| 19 |
+ return |
|
| 20 |
+ } |
|
| 21 |
+ if r.count(statusDownloading) == 0 && |
|
| 22 |
+ r.count(statusPending) == 0 && |
|
| 23 |
+ r.count(statusExtracting) > 0 {
|
|
| 24 |
+ |
|
| 25 |
+ printFn(fmt.Sprintf("Pulled %[1]d/%[1]d layers, 100%% complete", r.totalCount()))
|
|
| 26 |
+ printFn("Extracting")
|
|
| 27 |
+ extracting = true |
|
| 28 |
+ return |
|
| 29 |
+ } |
|
| 30 |
+ |
|
| 31 |
+ completeCount := r.count(statusComplete) + r.count(statusExtracting) |
|
| 32 |
+ var pctComplete float32 = 0.0 |
|
| 33 |
+ pctComplete += float32(completeCount) / float32(r.totalCount()) |
|
| 34 |
+ pctComplete += float32(r.count(statusDownloading)) / float32(r.totalCount()) * r.percentProgress(statusDownloading) / 100.0 |
|
| 35 |
+ pctComplete *= 100.0 |
|
| 36 |
+ printFn(fmt.Sprintf("Pulled %d/%d layers, %.0f%% complete", completeCount, r.totalCount(), pctComplete))
|
|
| 37 |
+ } |
|
| 38 |
+} |
|
| 39 |
+ |
|
| 40 |
+func pullLayersChanged(older, newer report) bool {
|
|
| 41 |
+ olderCompleteCount := older.count(statusComplete) + older.count(statusExtracting) |
|
| 42 |
+ newerCompleteCount := newer.count(statusComplete) + newer.count(statusExtracting) |
|
| 43 |
+ return olderCompleteCount != newerCompleteCount |
|
| 44 |
+} |
| 0 | 45 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,29 @@ |
| 0 |
+package imageprogress |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "fmt" |
|
| 4 |
+ "io" |
|
| 5 |
+) |
|
| 6 |
+ |
|
| 7 |
+// NewPushWriter creates a writer that periodically reports |
|
| 8 |
+// on push progress of a Docker image. It only reports when the state of the |
|
| 9 |
+// different layers has changed and uses time thresholds to limit the |
|
| 10 |
+// rate of the reports. |
|
| 11 |
+func NewPushWriter(printFn func(string)) io.Writer {
|
|
| 12 |
+ return newWriter(pushReporter(printFn), pushLayersChanged) |
|
| 13 |
+} |
|
| 14 |
+ |
|
| 15 |
+func pushReporter(printFn func(string)) func(report) {
|
|
| 16 |
+ return func(r report) {
|
|
| 17 |
+ var pctComplete float32 = 0.0 |
|
| 18 |
+ pctComplete += float32(r.count(statusComplete)) / float32(r.totalCount()) |
|
| 19 |
+ pctComplete += float32(r.count(statusPushing)) / float32(r.totalCount()) * r.percentProgress(statusPushing) / 100.0 |
|
| 20 |
+ pctComplete *= 100.0 |
|
| 21 |
+ |
|
| 22 |
+ printFn(fmt.Sprintf("Pushed %d/%d layers, %.0f%% complete", r.count(statusComplete), r.totalCount(), pctComplete))
|
|
| 23 |
+ } |
|
| 24 |
+} |
|
| 25 |
+ |
|
| 26 |
+func pushLayersChanged(older, newer report) bool {
|
|
| 27 |
+ return older.count(statusComplete) != newer.count(statusComplete) |
|
| 28 |
+} |