In cases where a logging plugin has crashed when the daemon tries to
copy the container stdio to the logging plugin it returns a broken pipe
error and any log entries that occurr while the plugin is down are lost.
Fix this by opening read+write in the daemon so logs are not lost while
the plugin is down.
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
| ... | ... |
@@ -12,7 +12,10 @@ import ( |
| 12 | 12 |
) |
| 13 | 13 |
|
| 14 | 14 |
func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) {
|
| 15 |
- f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_WRONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700) |
|
| 15 |
+ // Make sure to also open with read (in addition to write) to avoid borken pipe errors on plugin failure. |
|
| 16 |
+ // It is up to the plugin to keep track of pipes that it should re-attach to, however. |
|
| 17 |
+ // If the plugin doesn't open for reads, then the container will block once the pipe is full. |
|
| 18 |
+ f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700) |
|
| 16 | 19 |
if err != nil {
|
| 17 | 20 |
return nil, errors.Wrapf(err, "error creating i/o pipe for log plugin: %s", a.Name()) |
| 18 | 21 |
} |
| ... | ... |
@@ -116,3 +116,21 @@ func WithIPv6(network, ip string) func(*TestContainerConfig) {
|
| 116 | 116 |
c.NetworkingConfig.EndpointsConfig[network].IPAMConfig.IPv6Address = ip |
| 117 | 117 |
} |
| 118 | 118 |
} |
| 119 |
+ |
|
| 120 |
+// WithLogDriver sets the log driver to use for the container |
|
| 121 |
+func WithLogDriver(driver string) func(*TestContainerConfig) {
|
|
| 122 |
+ return func(c *TestContainerConfig) {
|
|
| 123 |
+ if c.HostConfig == nil {
|
|
| 124 |
+ c.HostConfig = &containertypes.HostConfig{}
|
|
| 125 |
+ } |
|
| 126 |
+ c.HostConfig.LogConfig.Type = driver |
|
| 127 |
+ } |
|
| 128 |
+} |
|
| 129 |
+ |
|
| 130 |
+// WithAutoRemove sets the container to be removed on exit |
|
| 131 |
+func WithAutoRemove(c *TestContainerConfig) {
|
|
| 132 |
+ if c.HostConfig == nil {
|
|
| 133 |
+ c.HostConfig = &containertypes.HostConfig{}
|
|
| 134 |
+ } |
|
| 135 |
+ c.HostConfig.AutoRemove = true |
|
| 136 |
+} |
| 119 | 137 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,48 @@ |
| 0 |
+package main |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "encoding/json" |
|
| 4 |
+ "fmt" |
|
| 5 |
+ "net" |
|
| 6 |
+ "net/http" |
|
| 7 |
+ "os" |
|
| 8 |
+) |
|
| 9 |
+ |
|
| 10 |
+type start struct {
|
|
| 11 |
+ File string |
|
| 12 |
+} |
|
| 13 |
+ |
|
| 14 |
+func main() {
|
|
| 15 |
+ l, err := net.Listen("unix", "/run/docker/plugins/plugin.sock")
|
|
| 16 |
+ if err != nil {
|
|
| 17 |
+ panic(err) |
|
| 18 |
+ } |
|
| 19 |
+ |
|
| 20 |
+ mux := http.NewServeMux() |
|
| 21 |
+ mux.HandleFunc("/LogDriver.StartLogging", func(w http.ResponseWriter, req *http.Request) {
|
|
| 22 |
+ startReq := &start{}
|
|
| 23 |
+ if err := json.NewDecoder(req.Body).Decode(startReq); err != nil {
|
|
| 24 |
+ http.Error(w, err.Error(), http.StatusBadRequest) |
|
| 25 |
+ return |
|
| 26 |
+ } |
|
| 27 |
+ |
|
| 28 |
+ f, err := os.OpenFile(startReq.File, os.O_RDONLY, 0600) |
|
| 29 |
+ if err != nil {
|
|
| 30 |
+ http.Error(w, err.Error(), http.StatusInternalServerError) |
|
| 31 |
+ return |
|
| 32 |
+ } |
|
| 33 |
+ |
|
| 34 |
+ // Close the file immediately, this allows us to test what happens in the daemon when the plugin has closed the |
|
| 35 |
+ // file or, for example, the plugin has crashed. |
|
| 36 |
+ f.Close() |
|
| 37 |
+ |
|
| 38 |
+ w.WriteHeader(http.StatusOK) |
|
| 39 |
+ fmt.Fprintln(w, `{}`)
|
|
| 40 |
+ }) |
|
| 41 |
+ server := http.Server{
|
|
| 42 |
+ Addr: l.Addr().String(), |
|
| 43 |
+ Handler: mux, |
|
| 44 |
+ } |
|
| 45 |
+ |
|
| 46 |
+ server.Serve(l) |
|
| 47 |
+} |
| 0 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,82 @@ |
| 0 |
+package logging |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bufio" |
|
| 4 |
+ "context" |
|
| 5 |
+ "os" |
|
| 6 |
+ "strings" |
|
| 7 |
+ "testing" |
|
| 8 |
+ "time" |
|
| 9 |
+ |
|
| 10 |
+ "github.com/docker/docker/api/types" |
|
| 11 |
+ "github.com/docker/docker/api/types/volume" |
|
| 12 |
+ "github.com/docker/docker/integration/internal/container" |
|
| 13 |
+ "github.com/docker/docker/internal/test/daemon" |
|
| 14 |
+ "github.com/gotestyourself/gotestyourself/assert" |
|
| 15 |
+) |
|
| 16 |
+ |
|
| 17 |
+func TestContinueAfterPluginCrash(t *testing.T) {
|
|
| 18 |
+ t.Parallel() |
|
| 19 |
+ |
|
| 20 |
+ d := daemon.New(t) |
|
| 21 |
+ d.StartWithBusybox(t, "--iptables=false", "--init") |
|
| 22 |
+ defer d.Stop(t) |
|
| 23 |
+ |
|
| 24 |
+ client := d.NewClientT(t) |
|
| 25 |
+ createPlugin(t, client, "test", "close_on_start", asLogDriver) |
|
| 26 |
+ |
|
| 27 |
+ ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) |
|
| 28 |
+ assert.Assert(t, client.PluginEnable(ctx, "test", types.PluginEnableOptions{Timeout: 30}))
|
|
| 29 |
+ cancel() |
|
| 30 |
+ defer client.PluginRemove(context.Background(), "test", types.PluginRemoveOptions{Force: true})
|
|
| 31 |
+ |
|
| 32 |
+ v, err := client.VolumeCreate(context.Background(), volume.VolumesCreateBody{})
|
|
| 33 |
+ assert.Assert(t, err) |
|
| 34 |
+ defer client.VolumeRemove(context.Background(), v.Name, true) |
|
| 35 |
+ |
|
| 36 |
+ ctx, cancel = context.WithTimeout(context.Background(), 60*time.Second) |
|
| 37 |
+ |
|
| 38 |
+ id := container.Run(t, ctx, client, |
|
| 39 |
+ container.WithAutoRemove, |
|
| 40 |
+ container.WithLogDriver("test"),
|
|
| 41 |
+ container.WithCmd( |
|
| 42 |
+ "/bin/sh", "-c", "while true; do sleep 1; echo hello; done", |
|
| 43 |
+ ), |
|
| 44 |
+ ) |
|
| 45 |
+ cancel() |
|
| 46 |
+ defer client.ContainerRemove(context.Background(), id, types.ContainerRemoveOptions{Force: true})
|
|
| 47 |
+ |
|
| 48 |
+ // Attach to the container to make sure it's written a few times to stdout |
|
| 49 |
+ attach, err := client.ContainerAttach(context.Background(), id, types.ContainerAttachOptions{Stream: true, Stdout: true})
|
|
| 50 |
+ assert.Assert(t, err) |
|
| 51 |
+ |
|
| 52 |
+ chErr := make(chan error) |
|
| 53 |
+ go func() {
|
|
| 54 |
+ defer close(chErr) |
|
| 55 |
+ rdr := bufio.NewReader(attach.Reader) |
|
| 56 |
+ for i := 0; i < 5; i++ {
|
|
| 57 |
+ _, _, err := rdr.ReadLine() |
|
| 58 |
+ if err != nil {
|
|
| 59 |
+ chErr <- err |
|
| 60 |
+ return |
|
| 61 |
+ } |
|
| 62 |
+ } |
|
| 63 |
+ }() |
|
| 64 |
+ |
|
| 65 |
+ select {
|
|
| 66 |
+ case err := <-chErr: |
|
| 67 |
+ assert.Assert(t, err) |
|
| 68 |
+ case <-time.After(60 * time.Second): |
|
| 69 |
+ t.Fatal("timeout waiting for container i/o")
|
|
| 70 |
+ } |
|
| 71 |
+ |
|
| 72 |
+ // check daemon logs for "broken pipe" |
|
| 73 |
+ // TODO(@cpuguy83): This is horribly hacky but is the only way to really test this case right now. |
|
| 74 |
+ // It would be nice if there was a way to know that a broken pipe has occurred without looking through the logs. |
|
| 75 |
+ log, err := os.Open(d.LogFileName()) |
|
| 76 |
+ assert.Assert(t, err) |
|
| 77 |
+ scanner := bufio.NewScanner(log) |
|
| 78 |
+ for scanner.Scan() {
|
|
| 79 |
+ assert.Assert(t, !strings.Contains(scanner.Text(), "broken pipe")) |
|
| 80 |
+ } |
|
| 81 |
+} |