Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ Memory limit - Increasing memory limit allows for more storage, but may cause ou

Larger CPUs - Increasing the CPU budget per instance should allow higher throughput

Sometimes, depending on the scaling of Log Cache, the number of sources (CF applications and platform components) and the log load, ingress drops may occur when Log Cache nodes send items between each other. The Log Cache `ingress_dropped` metric should be monitored, to make sure that there are no drops. For such cases, the following three parameters can be adjusted until the log loss is gone.

- Ingress Buffer Size - The ingress buffer (diode) size in number of items used when LogCache nodes send items between each other. The default size is 10000. Can be increased when ingress drops occur.
- Ingress Buffer Read Batch Size - The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send items between each other. The default size is 100. Can be increased when ingress drops occur.
- Ingress Buffer Read Batch Interval - The ingress buffer read interval in milliseconds. The default value is 250. Can be increased when ingress drops occur.


Log Cache is known to exceed memory limits under high throughput/stress. If you see your log-cache reaching higher memory
then you have set, you might want to scale your log-cache up. Either solely in terms of CPU per instance, or more instances.

Expand Down
12 changes: 12 additions & 0 deletions jobs/log-cache/spec
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ properties:
description: "The maximum number of items stored in LogCache per source."
default: 100000

ingress_buffer_size:
description: "The ingress buffer (diode) size in number of items. The size of the ingress buffer used when LogCache nodes send items between each other. In some cases, the buffer size must be risen in order to avoid ingress drops. Must be an integer."
default: 10000

ingress_buffer_read_batch_size:
description: "The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send envelopes between each other. In some cases, the batch size must be risen in order to make place in the buffer faster and avoid ingress drops. Must be an integer."
default: 100

ingress_buffer_read_batch_interval:
description: "The ingress buffer read batch interval in milliseconds. The interval in which the data will be read out from the ingress buffer used when LogCache nodes send envelopes between each other. In some cases, the batch read interval must be adjusted in order to make place in the buffer faster and avoid ingress drops. Must be an integer."
default: 250

truncation_interval:
description: "The amount of time between log-cache checking if it needs to prune"
default: "1s"
Expand Down
3 changes: 3 additions & 0 deletions jobs/log-cache/templates/bpm.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ processes:
ADDR: "<%= ":#{p('port')}" %>"
MEMORY_LIMIT_PERCENT: "<%= p('memory_limit_percent') %>"
MAX_PER_SOURCE: "<%= p('max_per_source') %>"
INGRESS_BUFFER_SIZE: "<%= p('ingress_buffer_size') %>"
INGRESS_BUFFER_READ_BATCH_SIZE: "<%= p('ingress_buffer_read_batch_size') %>"
INGRESS_BUFFER_READ_BATCH_INTERVAL: "<%= p('ingress_buffer_read_batch_interval') %>"
QUERY_TIMEOUT: "<%= p('promql.query_timeout') %>"
TRUNCATION_INTERVAL: "<%= p('truncation_interval') %>"
PRUNES_PER_GC: "<%= p('prunes_per_gc') %>"
Expand Down
33 changes: 27 additions & 6 deletions src/cmd/log-cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ type Config struct {
// assumed that the current node is the only one.
NodeAddrs []string `env:"NODE_ADDRS, report"`

// IngressBufferSize sets the size of the ingress buffer(diode) in number of items,
// used when LogCache nodes send items between each other. Depending on the Log
// Envelope Load in some cases it might be useful to raise the size in order to avoid
// ingress drops. Default is 10000.
IngressBufferSize int `env:"INGRESS_BUFFER_SIZE, report"`

// IngressBufferReadBatchSize sets the size of the ingress buffer(diode) read batch in number of items,
// used when used when LogCache nodes send items between each other. Depending on the Log
// Envelope Load in some cases it might be useful to raise the size in order to avoid
// ingress drops. Default is 100.
IngressBufferReadBatchSize int `env:"INGRESS_BUFFER_READ_BATCH_SIZE, report"`

// IngressBufferReadBatchInterval sets the interval in which the items will be read out from the ingress buffer(diode)
// in milliseconds, used when used when LogCache nodes send items between each other. Depending on the Log
// Envelope Load in some cases it might be useful to adjust the interval in order to avoid
// ingress drops. Default is 250.
IngressBufferReadBatchInterval int `env:"INGRESS_BUFFER_READ_BATCH_INTERVAL, report"`

TLS tls.TLS
MetricsServer config.MetricsServer
UseRFC339 bool `env:"USE_RFC339"`
Expand All @@ -65,12 +83,15 @@ type Config struct {
// LoadConfig creates Config object from environment variables
func LoadConfig() (*Config, error) {
c := Config{
Addr: ":8080",
QueryTimeout: 10 * time.Second,
MemoryLimitPercent: 50,
MaxPerSource: 100000,
TruncationInterval: 1 * time.Second,
PrunesPerGC: int64(3),
Addr: ":8080",
QueryTimeout: 10 * time.Second,
MemoryLimitPercent: 50,
MaxPerSource: 100000,
TruncationInterval: 1 * time.Second,
PrunesPerGC: int64(3),
IngressBufferSize: 10000,
IngressBufferReadBatchSize: 100,
IngressBufferReadBatchInterval: 250,
MetricsServer: config.MetricsServer{
Port: 6060,
},
Expand Down
3 changes: 3 additions & 0 deletions src/cmd/log-cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func main() {
WithQueryTimeout(cfg.QueryTimeout),
WithTruncationInterval(cfg.TruncationInterval),
WithPrunesPerGC(cfg.PrunesPerGC),
WithIngressBufferSize(cfg.IngressBufferSize),
WithIngressBufferReadBatchSize(cfg.IngressBufferReadBatchSize),
WithIngressBufferReadBatchInterval(cfg.IngressBufferReadBatchInterval),
}
var transport grpc.DialOption
if cfg.TLS.HasAnyCredential() {
Expand Down
64 changes: 49 additions & 15 deletions src/internal/cache/log_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ type LogCache struct {
metrics Metrics
closing int64

maxPerSource int
memoryLimitPercent float64
memoryLimit uint64
queryTimeout time.Duration
truncationInterval time.Duration
prunesPerGC int64
maxPerSource int
memoryLimitPercent float64
memoryLimit uint64
queryTimeout time.Duration
truncationInterval time.Duration
prunesPerGC int64
ingressBufferSize int
ingressBufferReadBatchSize int
ingressBufferReadBatchInterval int

// Cluster Properties
addr string
Expand All @@ -60,13 +63,16 @@ type LogCache struct {
// NewLogCache creates a new LogCache.
func New(m Metrics, logger *log.Logger, opts ...LogCacheOption) *LogCache {
cache := &LogCache{
log: logger,
metrics: m,
maxPerSource: 100000,
memoryLimitPercent: 50,
queryTimeout: 10 * time.Second,
truncationInterval: 1 * time.Second,
prunesPerGC: int64(3),
log: logger,
metrics: m,
maxPerSource: 100000,
ingressBufferSize: 10000,
ingressBufferReadBatchSize: 100,
ingressBufferReadBatchInterval: 250,
memoryLimitPercent: 50,
queryTimeout: 10 * time.Second,
truncationInterval: 1 * time.Second,
prunesPerGC: int64(3),

addr: ":8080",
dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
Expand Down Expand Up @@ -95,6 +101,33 @@ func WithMaxPerSource(size int) LogCacheOption {
}
}

// WithIngressBufferSize returns a LogCacheOption that configures the size
// of the batched input buffer size as number of envelopes. Defaults to
// 10000 envelopes
func WithIngressBufferSize(size int) LogCacheOption {
return func(c *LogCache) {
c.ingressBufferSize = size
}
}

// WithIngressBufferReadBatchSize returns a LogCacheOption that configures size
// of the ingress buffer(diode) read batch in number of envelopes.
// Defaults to 100 envelopes
func WithIngressBufferReadBatchSize(size int) LogCacheOption {
return func(c *LogCache) {
c.ingressBufferReadBatchSize = size
}
}

// WithIngressBufferReadBatchInterval returns a LogCacheOption that configures read interval
// of the ingress buffer(diode) in milliseconds.
// Defaults to 100 envelopes
func WithIngressBufferReadBatchInterval(interval int) LogCacheOption {
return func(c *LogCache) {
c.ingressBufferReadBatchInterval = interval
}
}

// WithTruncationInterval returns a LogCacheOption that configures the
// interval in ms on the store's truncation loop. Defaults to 1s.
func WithTruncationInterval(interval time.Duration) LogCacheOption {
Expand Down Expand Up @@ -225,8 +258,9 @@ func (c *LogCache) setupRouting(s *store.Store) {
}

bw := routing.NewBatchedIngressClient(
100,
250*time.Millisecond,
c.ingressBufferReadBatchSize,
c.ingressBufferSize,
time.Duration(c.ingressBufferReadBatchInterval)*time.Millisecond,
logcache_v1.NewIngressClient(conn),
c.metrics.NewCounter(
"ingress_dropped",
Expand Down
11 changes: 6 additions & 5 deletions src/internal/routing/batched_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type BatchedIngressClient struct {
c rpc.IngressClient

buffer *diodes.OneToOne
size int
batchSize int
interval time.Duration
log *log.Logger
sendFailureMetric metrics.Counter
Expand All @@ -36,7 +36,8 @@ func WithLocalOnlyDisabled(b *BatchedIngressClient) {

// NewBatchedIngressClient returns a new BatchedIngressClient.
func NewBatchedIngressClient(
size int,
batchSize int,
bufferSize int,
interval time.Duration,
c rpc.IngressClient,
droppedMetric metrics.Counter,
Expand All @@ -46,13 +47,13 @@ func NewBatchedIngressClient(
) *BatchedIngressClient {
b := &BatchedIngressClient{
c: c,
size: size,
batchSize: batchSize,
interval: interval,
log: log,
sendFailureMetric: sendFailureMetric,
localOnly: true,

buffer: diodes.NewOneToOne(10000, diodes.AlertFunc(func(dropped int) {
buffer: diodes.NewOneToOne(bufferSize, diodes.AlertFunc(func(dropped int) {
log.Printf("dropped %d envelopes", dropped)
droppedMetric.Add(float64(dropped))
})),
Expand All @@ -77,7 +78,7 @@ func (b *BatchedIngressClient) Send(ctx context.Context, in *rpc.SendRequest, op
}

func (b *BatchedIngressClient) start() {
batcher := batching.NewBatcher(b.size, b.interval, batching.WriterFunc(b.write))
batcher := batching.NewBatcher(b.batchSize, b.interval, batching.WriterFunc(b.write))
for {
e, ok := b.buffer.TryNext()
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions src/internal/routing/batched_ingress_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("BatchedIngressClient", func() {
m = testhelpers.NewMetricsRegistry()
spyDropped = m.NewCounter("nodeX_dropped", "some help text")
ingressClient = newSpyIngressClient()
c = routing.NewBatchedIngressClient(5, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
c = routing.NewBatchedIngressClient(5, 10, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
})

It("sends envelopes by batches because of size", func() {
Expand All @@ -49,7 +49,7 @@ var _ = Describe("BatchedIngressClient", func() {
})

It("sends envelopes by batches because of interval", func() {
c = routing.NewBatchedIngressClient(5, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
c = routing.NewBatchedIngressClient(5, 10, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
_, err := c.Send(context.Background(), &rpc.SendRequest{
Envelopes: &loggregator_v2.EnvelopeBatch{
Batch: []*loggregator_v2.Envelope{
Expand Down Expand Up @@ -117,6 +117,7 @@ var _ = Describe("BatchedIngressClient", func() {
It("sends envelopes with LocalOnly false with option", func() {
c = routing.NewBatchedIngressClient(
5,
10,
time.Hour,
ingressClient,
spyDropped,
Expand Down
2 changes: 1 addition & 1 deletion src/vendor/code.cloudfoundry.org/go-batching/batcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading