Proposal: unix-sockets support in Fluentd logging driver
Sebastiaan van Stijn authored on 2016/11/09 20:49:06... | ... |
@@ -6,6 +6,7 @@ import ( |
6 | 6 |
"fmt" |
7 | 7 |
"math" |
8 | 8 |
"net" |
9 |
+ "net/url" |
|
9 | 10 |
"strconv" |
10 | 11 |
"strings" |
11 | 12 |
"time" |
... | ... |
@@ -13,8 +14,10 @@ import ( |
13 | 13 |
"github.com/Sirupsen/logrus" |
14 | 14 |
"github.com/docker/docker/daemon/logger" |
15 | 15 |
"github.com/docker/docker/daemon/logger/loggerutils" |
16 |
+ "github.com/docker/docker/pkg/urlutil" |
|
16 | 17 |
"github.com/docker/go-units" |
17 | 18 |
"github.com/fluent/fluent-logger-golang/fluent" |
19 |
+ "github.com/pkg/errors" |
|
18 | 20 |
) |
19 | 21 |
|
20 | 22 |
type fluentd struct { |
... | ... |
@@ -25,9 +28,17 @@ type fluentd struct { |
25 | 25 |
extra map[string]string |
26 | 26 |
} |
27 | 27 |
|
28 |
+type location struct { |
|
29 |
+ protocol string |
|
30 |
+ host string |
|
31 |
+ port int |
|
32 |
+ path string |
|
33 |
+} |
|
34 |
+ |
|
28 | 35 |
const ( |
29 | 36 |
name = "fluentd" |
30 | 37 |
|
38 |
+ defaultProtocol = "tcp" |
|
31 | 39 |
defaultHost = "127.0.0.1" |
32 | 40 |
defaultPort = 24224 |
33 | 41 |
defaultBufferLimit = 1024 * 1024 |
... | ... |
@@ -57,7 +68,7 @@ func init() { |
57 | 57 |
// the context. The supported context configuration variable is |
58 | 58 |
// fluentd-address. |
59 | 59 |
func New(ctx logger.Context) (logger.Logger, error) { |
60 |
- host, port, err := parseAddress(ctx.Config[addressKey]) |
|
60 |
+ loc, err := parseAddress(ctx.Config[addressKey]) |
|
61 | 61 |
if err != nil { |
62 | 62 |
return nil, err |
63 | 63 |
} |
... | ... |
@@ -104,12 +115,14 @@ func New(ctx logger.Context) (logger.Logger, error) { |
104 | 104 |
} |
105 | 105 |
|
106 | 106 |
fluentConfig := fluent.Config{ |
107 |
- FluentPort: port, |
|
108 |
- FluentHost: host, |
|
109 |
- BufferLimit: bufferLimit, |
|
110 |
- RetryWait: retryWait, |
|
111 |
- MaxRetry: maxRetries, |
|
112 |
- AsyncConnect: asyncConnect, |
|
107 |
+ FluentPort: loc.port, |
|
108 |
+ FluentHost: loc.host, |
|
109 |
+ FluentNetwork: loc.protocol, |
|
110 |
+ FluentSocketPath: loc.path, |
|
111 |
+ BufferLimit: bufferLimit, |
|
112 |
+ RetryWait: retryWait, |
|
113 |
+ MaxRetry: maxRetries, |
|
114 |
+ AsyncConnect: asyncConnect, |
|
113 | 115 |
} |
114 | 116 |
|
115 | 117 |
logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig). |
... | ... |
@@ -169,29 +182,65 @@ func ValidateLogOpt(cfg map[string]string) error { |
169 | 169 |
} |
170 | 170 |
} |
171 | 171 |
|
172 |
- if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil { |
|
172 |
+ if _, err := parseAddress(cfg["fluentd-address"]); err != nil { |
|
173 | 173 |
return err |
174 | 174 |
} |
175 | 175 |
|
176 | 176 |
return nil |
177 | 177 |
} |
178 | 178 |
|
179 |
-func parseAddress(address string) (string, int, error) { |
|
179 |
+func parseAddress(address string) (*location, error) { |
|
180 | 180 |
if address == "" { |
181 |
- return defaultHost, defaultPort, nil |
|
181 |
+ return &location{ |
|
182 |
+ protocol: defaultProtocol, |
|
183 |
+ host: defaultHost, |
|
184 |
+ port: defaultPort, |
|
185 |
+ path: "", |
|
186 |
+ }, nil |
|
187 |
+ } |
|
188 |
+ |
|
189 |
+ protocol := defaultProtocol |
|
190 |
+ givenAddress := address |
|
191 |
+ if urlutil.IsTransportURL(address) { |
|
192 |
+ url, err := url.Parse(address) |
|
193 |
+ if err != nil { |
|
194 |
+ return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) |
|
195 |
+ } |
|
196 |
+ // unix and unixgram socket |
|
197 |
+ if url.Scheme == "unix" || url.Scheme == "unixgram" { |
|
198 |
+ return &location{ |
|
199 |
+ protocol: url.Scheme, |
|
200 |
+ host: "", |
|
201 |
+ port: 0, |
|
202 |
+ path: url.Path, |
|
203 |
+ }, nil |
|
204 |
+ } |
|
205 |
+ // tcp|udp |
|
206 |
+ protocol = url.Scheme |
|
207 |
+ address = url.Host |
|
182 | 208 |
} |
183 | 209 |
|
184 | 210 |
host, port, err := net.SplitHostPort(address) |
185 | 211 |
if err != nil { |
186 | 212 |
if !strings.Contains(err.Error(), "missing port in address") { |
187 |
- return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err) |
|
213 |
+ return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) |
|
188 | 214 |
} |
189 |
- return host, defaultPort, nil |
|
215 |
+ return &location{ |
|
216 |
+ protocol: protocol, |
|
217 |
+ host: host, |
|
218 |
+ port: defaultPort, |
|
219 |
+ path: "", |
|
220 |
+ }, nil |
|
190 | 221 |
} |
191 | 222 |
|
192 | 223 |
portnum, err := strconv.Atoi(port) |
193 | 224 |
if err != nil { |
194 |
- return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err) |
|
225 |
+ return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) |
|
195 | 226 |
} |
196 |
- return host, portnum, nil |
|
227 |
+ return &location{ |
|
228 |
+ protocol: protocol, |
|
229 |
+ host: host, |
|
230 |
+ port: portnum, |
|
231 |
+ path: "", |
|
232 |
+ }, nil |
|
197 | 233 |
} |