diff --git a/README.md b/README.md index aaa30c835..f6fa5f810 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,13 @@ Memory limit - Increasing memory limit allows for more storage, but may cause ou Larger CPUs - Increasing the CPU budget per instance should allow higher throughput +Sometimes, depending on the scaling of Log Cache, the number of sources (CF applications and platform components) and the log load, ingress drops may occur when Log Cache nodes send items between each other. The Log Cache `ingress_dropped` metric should be monitored, to make sure that there are no drops. For such cases, the following three parameters can be adjusted until the log loss is gone. + +- Ingress Buffer Size - The ingress buffer (diode) size in number of items used when LogCache nodes send items between each other. The default size is 10000. Can be increased when ingress drops occur. +- Ingress Buffer Read Batch Size - The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send items between each other. The default size is 100. Can be increased when ingress drops occur. +- Ingress Buffer Read Batch Interval - The ingress buffer read interval in milliseconds. The default value is 250. Can be increased when ingress drops occur. + + Log Cache is known to exceed memory limits under high throughput/stress. If you see your log-cache reaching higher memory then you have set, you might want to scale your log-cache up. Either solely in terms of CPU per instance, or more instances. diff --git a/jobs/log-cache/spec b/jobs/log-cache/spec index b59677792..f7f58ef3c 100644 --- a/jobs/log-cache/spec +++ b/jobs/log-cache/spec @@ -47,6 +47,18 @@ properties: description: "The maximum number of items stored in LogCache per source." default: 100000 + ingress_buffer_size: + description: "The ingress buffer (diode) size in number of items. The size of the ingress buffer used when LogCache nodes send items between each other. In some cases, the buffer size must be risen in order to avoid ingress drops. Must be an integer." + default: 10000 + + ingress_buffer_read_batch_size: + description: "The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send envelopes between each other. In some cases, the batch size must be risen in order to make place in the buffer faster and avoid ingress drops. Must be an integer." + default: 100 + + ingress_buffer_read_batch_interval: + description: "The ingress buffer read batch interval in milliseconds. The interval in which the data will be read out from the ingress buffer used when LogCache nodes send envelopes between each other. In some cases, the batch read interval must be adjusted in order to make place in the buffer faster and avoid ingress drops. Must be an integer." + default: 250 + truncation_interval: description: "The amount of time between log-cache checking if it needs to prune" default: "1s" diff --git a/jobs/log-cache/templates/bpm.yml.erb b/jobs/log-cache/templates/bpm.yml.erb index 02250e96d..7e9c6b7e6 100644 --- a/jobs/log-cache/templates/bpm.yml.erb +++ b/jobs/log-cache/templates/bpm.yml.erb @@ -26,6 +26,9 @@ processes: ADDR: "<%= ":#{p('port')}" %>" MEMORY_LIMIT_PERCENT: "<%= p('memory_limit_percent') %>" MAX_PER_SOURCE: "<%= p('max_per_source') %>" + INGRESS_BUFFER_SIZE: "<%= p('ingress_buffer_size') %>" + INGRESS_BUFFER_READ_BATCH_SIZE: "<%= p('ingress_buffer_read_batch_size') %>" + INGRESS_BUFFER_READ_BATCH_INTERVAL: "<%= p('ingress_buffer_read_batch_interval') %>" QUERY_TIMEOUT: "<%= p('promql.query_timeout') %>" TRUNCATION_INTERVAL: "<%= p('truncation_interval') %>" PRUNES_PER_GC: "<%= p('prunes_per_gc') %>" diff --git a/src/cmd/log-cache/config.go b/src/cmd/log-cache/config.go index dbb83943a..ca88f7091 100644 --- a/src/cmd/log-cache/config.go +++ b/src/cmd/log-cache/config.go @@ -57,6 +57,24 @@ type Config struct { // assumed that the current node is the only one. NodeAddrs []string `env:"NODE_ADDRS, report"` + // IngressBufferSize sets the size of the ingress buffer(diode) in number of items, + // used when LogCache nodes send items between each other. Depending on the Log + // Envelope Load in some cases it might be useful to raise the size in order to avoid + // ingress drops. Default is 10000. + IngressBufferSize int `env:"INGRESS_BUFFER_SIZE, report"` + + // IngressBufferReadBatchSize sets the size of the ingress buffer(diode) read batch in number of items, + // used when used when LogCache nodes send items between each other. Depending on the Log + // Envelope Load in some cases it might be useful to raise the size in order to avoid + // ingress drops. Default is 100. + IngressBufferReadBatchSize int `env:"INGRESS_BUFFER_READ_BATCH_SIZE, report"` + + // IngressBufferReadBatchInterval sets the interval in which the items will be read out from the ingress buffer(diode) + // in milliseconds, used when used when LogCache nodes send items between each other. Depending on the Log + // Envelope Load in some cases it might be useful to adjust the interval in order to avoid + // ingress drops. Default is 250. + IngressBufferReadBatchInterval int `env:"INGRESS_BUFFER_READ_BATCH_INTERVAL, report"` + TLS tls.TLS MetricsServer config.MetricsServer UseRFC339 bool `env:"USE_RFC339"` @@ -65,12 +83,15 @@ type Config struct { // LoadConfig creates Config object from environment variables func LoadConfig() (*Config, error) { c := Config{ - Addr: ":8080", - QueryTimeout: 10 * time.Second, - MemoryLimitPercent: 50, - MaxPerSource: 100000, - TruncationInterval: 1 * time.Second, - PrunesPerGC: int64(3), + Addr: ":8080", + QueryTimeout: 10 * time.Second, + MemoryLimitPercent: 50, + MaxPerSource: 100000, + TruncationInterval: 1 * time.Second, + PrunesPerGC: int64(3), + IngressBufferSize: 10000, + IngressBufferReadBatchSize: 100, + IngressBufferReadBatchInterval: 250, MetricsServer: config.MetricsServer{ Port: 6060, }, diff --git a/src/cmd/log-cache/main.go b/src/cmd/log-cache/main.go index 2b454a0c5..bdcf51687 100644 --- a/src/cmd/log-cache/main.go +++ b/src/cmd/log-cache/main.go @@ -97,6 +97,9 @@ func main() { WithQueryTimeout(cfg.QueryTimeout), WithTruncationInterval(cfg.TruncationInterval), WithPrunesPerGC(cfg.PrunesPerGC), + WithIngressBufferSize(cfg.IngressBufferSize), + WithIngressBufferReadBatchSize(cfg.IngressBufferReadBatchSize), + WithIngressBufferReadBatchInterval(cfg.IngressBufferReadBatchInterval), } var transport grpc.DialOption if cfg.TLS.HasAnyCredential() { diff --git a/src/internal/cache/log_cache.go b/src/internal/cache/log_cache.go index acaef4277..522327f1d 100644 --- a/src/internal/cache/log_cache.go +++ b/src/internal/cache/log_cache.go @@ -37,12 +37,15 @@ type LogCache struct { metrics Metrics closing int64 - maxPerSource int - memoryLimitPercent float64 - memoryLimit uint64 - queryTimeout time.Duration - truncationInterval time.Duration - prunesPerGC int64 + maxPerSource int + memoryLimitPercent float64 + memoryLimit uint64 + queryTimeout time.Duration + truncationInterval time.Duration + prunesPerGC int64 + ingressBufferSize int + ingressBufferReadBatchSize int + ingressBufferReadBatchInterval int // Cluster Properties addr string @@ -60,13 +63,16 @@ type LogCache struct { // NewLogCache creates a new LogCache. func New(m Metrics, logger *log.Logger, opts ...LogCacheOption) *LogCache { cache := &LogCache{ - log: logger, - metrics: m, - maxPerSource: 100000, - memoryLimitPercent: 50, - queryTimeout: 10 * time.Second, - truncationInterval: 1 * time.Second, - prunesPerGC: int64(3), + log: logger, + metrics: m, + maxPerSource: 100000, + ingressBufferSize: 10000, + ingressBufferReadBatchSize: 100, + ingressBufferReadBatchInterval: 250, + memoryLimitPercent: 50, + queryTimeout: 10 * time.Second, + truncationInterval: 1 * time.Second, + prunesPerGC: int64(3), addr: ":8080", dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, @@ -95,6 +101,33 @@ func WithMaxPerSource(size int) LogCacheOption { } } +// WithIngressBufferSize returns a LogCacheOption that configures the size +// of the batched input buffer size as number of envelopes. Defaults to +// 10000 envelopes +func WithIngressBufferSize(size int) LogCacheOption { + return func(c *LogCache) { + c.ingressBufferSize = size + } +} + +// WithIngressBufferReadBatchSize returns a LogCacheOption that configures size +// of the ingress buffer(diode) read batch in number of envelopes. +// Defaults to 100 envelopes +func WithIngressBufferReadBatchSize(size int) LogCacheOption { + return func(c *LogCache) { + c.ingressBufferReadBatchSize = size + } +} + +// WithIngressBufferReadBatchInterval returns a LogCacheOption that configures read interval +// of the ingress buffer(diode) in milliseconds. +// Defaults to 100 envelopes +func WithIngressBufferReadBatchInterval(interval int) LogCacheOption { + return func(c *LogCache) { + c.ingressBufferReadBatchInterval = interval + } +} + // WithTruncationInterval returns a LogCacheOption that configures the // interval in ms on the store's truncation loop. Defaults to 1s. func WithTruncationInterval(interval time.Duration) LogCacheOption { @@ -225,8 +258,9 @@ func (c *LogCache) setupRouting(s *store.Store) { } bw := routing.NewBatchedIngressClient( - 100, - 250*time.Millisecond, + c.ingressBufferReadBatchSize, + c.ingressBufferSize, + time.Duration(c.ingressBufferReadBatchInterval)*time.Millisecond, logcache_v1.NewIngressClient(conn), c.metrics.NewCounter( "ingress_dropped", diff --git a/src/internal/routing/batched_ingress_client.go b/src/internal/routing/batched_ingress_client.go index adabbf9fc..6ee212c63 100644 --- a/src/internal/routing/batched_ingress_client.go +++ b/src/internal/routing/batched_ingress_client.go @@ -20,7 +20,7 @@ type BatchedIngressClient struct { c rpc.IngressClient buffer *diodes.OneToOne - size int + batchSize int interval time.Duration log *log.Logger sendFailureMetric metrics.Counter @@ -36,7 +36,8 @@ func WithLocalOnlyDisabled(b *BatchedIngressClient) { // NewBatchedIngressClient returns a new BatchedIngressClient. func NewBatchedIngressClient( - size int, + batchSize int, + bufferSize int, interval time.Duration, c rpc.IngressClient, droppedMetric metrics.Counter, @@ -46,13 +47,13 @@ func NewBatchedIngressClient( ) *BatchedIngressClient { b := &BatchedIngressClient{ c: c, - size: size, + batchSize: batchSize, interval: interval, log: log, sendFailureMetric: sendFailureMetric, localOnly: true, - buffer: diodes.NewOneToOne(10000, diodes.AlertFunc(func(dropped int) { + buffer: diodes.NewOneToOne(bufferSize, diodes.AlertFunc(func(dropped int) { log.Printf("dropped %d envelopes", dropped) droppedMetric.Add(float64(dropped)) })), @@ -77,7 +78,7 @@ func (b *BatchedIngressClient) Send(ctx context.Context, in *rpc.SendRequest, op } func (b *BatchedIngressClient) start() { - batcher := batching.NewBatcher(b.size, b.interval, batching.WriterFunc(b.write)) + batcher := batching.NewBatcher(b.batchSize, b.interval, batching.WriterFunc(b.write)) for { e, ok := b.buffer.TryNext() if !ok { diff --git a/src/internal/routing/batched_ingress_client_test.go b/src/internal/routing/batched_ingress_client_test.go index 5beeba5fb..f9a1d8d7e 100644 --- a/src/internal/routing/batched_ingress_client_test.go +++ b/src/internal/routing/batched_ingress_client_test.go @@ -29,7 +29,7 @@ var _ = Describe("BatchedIngressClient", func() { m = testhelpers.NewMetricsRegistry() spyDropped = m.NewCounter("nodeX_dropped", "some help text") ingressClient = newSpyIngressClient() - c = routing.NewBatchedIngressClient(5, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0)) + c = routing.NewBatchedIngressClient(5, 10, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0)) }) It("sends envelopes by batches because of size", func() { @@ -49,7 +49,7 @@ var _ = Describe("BatchedIngressClient", func() { }) It("sends envelopes by batches because of interval", func() { - c = routing.NewBatchedIngressClient(5, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0)) + c = routing.NewBatchedIngressClient(5, 10, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0)) _, err := c.Send(context.Background(), &rpc.SendRequest{ Envelopes: &loggregator_v2.EnvelopeBatch{ Batch: []*loggregator_v2.Envelope{ @@ -117,6 +117,7 @@ var _ = Describe("BatchedIngressClient", func() { It("sends envelopes with LocalOnly false with option", func() { c = routing.NewBatchedIngressClient( 5, + 10, time.Hour, ingressClient, spyDropped, diff --git a/src/vendor/code.cloudfoundry.org/go-batching/batcher.go b/src/vendor/code.cloudfoundry.org/go-batching/batcher.go index ede6cd783..a1b3ca34c 100644 --- a/src/vendor/code.cloudfoundry.org/go-batching/batcher.go +++ b/src/vendor/code.cloudfoundry.org/go-batching/batcher.go @@ -28,7 +28,7 @@ func (f WriterFunc) Write(batch []interface{}) { f(batch) } -// NewBatcher creates a new Batcher. It is recommenended to use a wrapper type +// NewBatcher creates a new Batcher. It is recommended to use a wrapper type // such as NewByteBatcher or NewV2EnvelopeBatcher vs using this directly. func NewBatcher(size int, interval time.Duration, writer Writer) *Batcher { return &Batcher{