Skip to content

Commit daabca6

Browse files
committed
make compression level adaptive and configurable
1 parent 5f368fd commit daabca6

File tree

5 files changed

+396
-23
lines changed

5 files changed

+396
-23
lines changed

arbnode/batch_poster.go

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,12 @@ type BatchPosterConfig struct {
171171
// Batch post polling interval.
172172
PollInterval time.Duration `koanf:"poll-interval" reload:"hot"`
173173
// Batch posting error delay.
174-
ErrorDelay time.Duration `koanf:"error-delay" reload:"hot"`
175-
CompressionLevel int `koanf:"compression-level" reload:"hot"`
174+
ErrorDelay time.Duration `koanf:"error-delay" reload:"hot"`
175+
// Deprecated: use CompressionLevels instead. This sets a single compression level for all backlog levels.
176+
CompressionLevel int `koanf:"compression-level" reload:"hot"`
177+
// CompressionLevels defines adaptive compression based on backlog. Each entry specifies the
178+
// compression level and recompression level to use when backlog >= the entry's backlog threshold.
179+
CompressionLevels CompressionLevelStepList `koanf:"compression-levels" reload:"hot"`
176180
DASRetentionPeriod time.Duration `koanf:"das-retention-period" reload:"hot"`
177181
GasRefunderAddress string `koanf:"gas-refunder-address" reload:"hot"`
178182
DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"`
@@ -229,6 +233,12 @@ func (c *BatchPosterConfig) Validate() error {
229233
} else {
230234
return fmt.Errorf("invalid L1 block bound tag \"%v\" (see --help for options)", c.L1BlockBound)
231235
}
236+
// Resolve compression levels from deprecated and new config fields
237+
resolved, err := ResolveCompressionLevels(c.CompressionLevel, c.CompressionLevels)
238+
if err != nil {
239+
return err
240+
}
241+
c.CompressionLevels = resolved
232242
return nil
233243
}
234244

@@ -250,7 +260,11 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
250260
f.Bool(prefix+".wait-for-max-delay", DefaultBatchPosterConfig.WaitForMaxDelay, "wait for the max batch delay, even if the batch is full")
251261
f.Duration(prefix+".poll-interval", DefaultBatchPosterConfig.PollInterval, "how long to wait after no batches are ready to be posted before checking again")
252262
f.Duration(prefix+".error-delay", DefaultBatchPosterConfig.ErrorDelay, "how long to delay after error posting batch")
253-
f.Int(prefix+".compression-level", DefaultBatchPosterConfig.CompressionLevel, "batch compression level")
263+
f.Int(prefix+".compression-level", DefaultBatchPosterConfig.CompressionLevel, "DEPRECATED: use compression-levels instead. batch compression level")
264+
f.Var(&parsedCompressionLevelsConf, prefix+".compression-levels",
265+
`JSON array of compression level steps. Format: [{"backlog":<int>,"level":<int>,"recompression-level":<int>},...]. `+
266+
`First entry must have backlog:0. Both Level and recomp-level must be 0-11, weakly descending. `+
267+
`Example: [{"backlog":0,"level":11,"recompression-level":11},{"backlog":21,"level":6,"recompression-level":11}]`)
254268
f.Duration(prefix+".das-retention-period", DefaultBatchPosterConfig.DASRetentionPeriod, "In AnyTrust mode, the period which DASes are requested to retain the stored batches.")
255269
f.String(prefix+".gas-refunder-address", DefaultBatchPosterConfig.GasRefunderAddress, "The gas refunder contract address (optional)")
256270
f.Uint64(prefix+".extra-batch-gas", DefaultBatchPosterConfig.ExtraBatchGas, "use this much more gas than estimation says is necessary to post batches")
@@ -288,7 +302,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
288302
ErrorDelay: time.Second * 10,
289303
MaxDelay: time.Hour,
290304
WaitForMaxDelay: false,
291-
CompressionLevel: brotli.BestCompression,
305+
CompressionLevel: 0,
292306
DASRetentionPeriod: daprovider.DefaultDASRetentionPeriod,
293307
GasRefunderAddress: "",
294308
ExtraBatchGas: 50_000,
@@ -328,6 +342,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
328342
MaxDelay: 0,
329343
WaitForMaxDelay: false,
330344
CompressionLevel: 2,
345+
CompressionLevels: CompressionLevelStepList{{Backlog: 0, Level: 2, RecompressionLevel: 2}},
331346
DASRetentionPeriod: daprovider.DefaultDASRetentionPeriod,
332347
GasRefunderAddress: "",
333348
ExtraBatchGas: 10_000,
@@ -962,25 +977,17 @@ func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64,
962977
maxSize -= SequencerMessageHeaderSize
963978
}
964979
compressedBuffer := bytes.NewBuffer(make([]byte, 0, maxSize*2))
965-
compressionLevel := b.config().CompressionLevel
966-
recompressionLevel := b.config().CompressionLevel
967-
if b.GetBacklogEstimate() > 20 {
968-
compressionLevel = arbmath.MinInt(compressionLevel, brotli.DefaultCompression)
969-
}
970-
if b.GetBacklogEstimate() > 40 {
971-
recompressionLevel = arbmath.MinInt(recompressionLevel, brotli.DefaultCompression)
972-
}
973-
if b.GetBacklogEstimate() > 60 {
974-
compressionLevel = arbmath.MinInt(compressionLevel, 4)
975-
}
976-
if recompressionLevel < compressionLevel {
977-
// This should never be possible
978-
log.Warn(
979-
"somehow the recompression level was lower than the compression level",
980-
"recompressionLevel", recompressionLevel,
981-
"compressionLevel", compressionLevel,
982-
)
983-
recompressionLevel = compressionLevel
980+
// Determine compression levels based on backlog using configured steps
981+
compressionLevel := config.CompressionLevels[0].Level
982+
recompressionLevel := config.CompressionLevels[0].RecompressionLevel
983+
backlog := b.GetBacklogEstimate()
984+
for _, step := range config.CompressionLevels {
985+
if backlog >= step.Backlog {
986+
compressionLevel = step.Level
987+
recompressionLevel = step.RecompressionLevel
988+
} else {
989+
break
990+
}
984991
}
985992
return &batchSegments{
986993
compressedBuffer: compressedBuffer,

arbnode/compression_level.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2021-2022, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package arbnode
5+
6+
import (
7+
"encoding/json"
8+
"errors"
9+
"fmt"
10+
11+
"github.com/andybalholm/brotli"
12+
"github.com/knadh/koanf"
13+
"github.com/knadh/koanf/providers/confmap"
14+
15+
"github.com/offchainlabs/nitro/util/arbmath"
16+
)
17+
18+
// CompressionLevelStep defines compression levels to use at a given backlog threshold.
19+
type CompressionLevelStep struct {
20+
Backlog uint64 `koanf:"backlog" json:"backlog"`
21+
Level int `koanf:"level" json:"level"`
22+
RecompressionLevel int `koanf:"recompression-level" json:"recompression-level"`
23+
}
24+
25+
// CompressionLevelStepList is a list of compression level steps for configuring
26+
// adaptive compression based on batch backlog.
27+
type CompressionLevelStepList []CompressionLevelStep
28+
29+
func (l *CompressionLevelStepList) Set(jsonStr string) error {
30+
return l.UnmarshalJSON([]byte(jsonStr))
31+
}
32+
33+
func (l *CompressionLevelStepList) String() string {
34+
b, _ := json.Marshal(l)
35+
return string(b)
36+
}
37+
38+
func (l *CompressionLevelStepList) UnmarshalJSON(data []byte) error {
39+
var tmp []CompressionLevelStep
40+
if err := json.Unmarshal(data, &tmp); err != nil {
41+
return err
42+
}
43+
*l = tmp
44+
return nil
45+
}
46+
47+
func (_ *CompressionLevelStepList) Type() string {
48+
return "CompressionLevelStepList"
49+
}
50+
51+
// Validate checks that the compression level steps are valid:
52+
// - Must have at least one entry
53+
// - First entry must have backlog: 0
54+
// - Backlog thresholds must be strictly ascending
55+
// - Level and RecompressionLevel must be weakly descending (non-increasing)
56+
// - RecompressionLevel must be >= Level within each entry
57+
// - All levels must be in valid range: 0-11
58+
func (l CompressionLevelStepList) Validate() error {
59+
if len(l) == 0 {
60+
return errors.New("compression-levels must have at least one entry")
61+
}
62+
if l[0].Backlog != 0 {
63+
return errors.New("first compression-levels entry must have backlog: 0")
64+
}
65+
for i, step := range l {
66+
if step.Level < 0 || step.Level > 11 {
67+
return fmt.Errorf("compression-levels[%d].level must be 0-11, got %d", i, step.Level)
68+
}
69+
if step.RecompressionLevel < 0 || step.RecompressionLevel > 11 {
70+
return fmt.Errorf("compression-levels[%d].recompression-level must be 0-11, got %d", i, step.RecompressionLevel)
71+
}
72+
if step.RecompressionLevel < step.Level {
73+
return fmt.Errorf("compression-levels[%d].recompression-level (%d) must be >= level (%d)", i, step.RecompressionLevel, step.Level)
74+
}
75+
if i > 0 {
76+
if step.Backlog <= l[i-1].Backlog {
77+
return fmt.Errorf("compression-levels[%d].backlog must be > compression-levels[%d].backlog", i, i-1)
78+
}
79+
if step.Level > l[i-1].Level {
80+
return fmt.Errorf("compression-levels[%d].level must be <= compression-levels[%d].level (weakly descending)", i, i-1)
81+
}
82+
if step.RecompressionLevel > l[i-1].RecompressionLevel {
83+
return fmt.Errorf("compression-levels[%d].recompression-level must be <= compression-levels[%d].recompression-level (weakly descending)", i, i-1)
84+
}
85+
}
86+
}
87+
return nil
88+
}
89+
90+
var parsedCompressionLevelsConf CompressionLevelStepList
91+
92+
// FixCompressionLevelsCLIParsing decode compression-levels json CLI ARG
93+
func FixCompressionLevelsCLIParsing(path string, k *koanf.Koanf) error {
94+
raw := k.Get(path)
95+
if jsonStr, ok := raw.(string); ok {
96+
if err := parsedCompressionLevelsConf.Set(jsonStr); err != nil {
97+
98+
return err
99+
}
100+
tempMap := map[string]interface{}{path: parsedCompressionLevelsConf}
101+
if err := k.Load(confmap.Provider(tempMap, "."), nil); err != nil {
102+
return err
103+
}
104+
}
105+
return fmt.Errorf("CompressionLevels config not found in %s", path)
106+
}
107+
108+
// DefaultCompressionLevels replicates the previous hardcoded adaptive compression behavior:
109+
var DefaultCompressionLevels = CompressionLevelStepList{
110+
{Backlog: 0, Level: brotli.BestCompression, RecompressionLevel: brotli.BestCompression},
111+
{Backlog: 21, Level: brotli.DefaultCompression, RecompressionLevel: brotli.BestCompression},
112+
{Backlog: 41, Level: brotli.DefaultCompression, RecompressionLevel: brotli.DefaultCompression},
113+
{Backlog: 61, Level: 4, RecompressionLevel: brotli.DefaultCompression},
114+
}
115+
116+
// ResolveCompressionLevels resolves the compression configuration from deprecated and new fields.
117+
// Returns error if both are set. Converts deprecated format to new format if needed.
118+
func ResolveCompressionLevels(compressionLevel int, compressionLevels CompressionLevelStepList) (CompressionLevelStepList, error) {
119+
// Check for conflict: both deprecated and new config set
120+
if compressionLevel > 0 && len(compressionLevels) > 0 {
121+
return nil, errors.New("cannot specify both compression-level (deprecated) and compression-levels; use only compression-levels")
122+
}
123+
124+
// Return DefaultCompressionLevels if both (compressionLevel and compressionLevels ) are not set
125+
if len(compressionLevels) == 0 && compressionLevel == 0 {
126+
return DefaultCompressionLevels, nil
127+
}
128+
129+
// If new config is set, validate and return it
130+
if len(compressionLevels) > 0 {
131+
if err := compressionLevels.Validate(); err != nil {
132+
return nil, fmt.Errorf("invalid compression-levels: %w", err)
133+
}
134+
return compressionLevels, nil
135+
}
136+
137+
// Convert deprecated `compressionLevel` config to new format of compressionLevels
138+
resolved := CompressionLevelStepList{
139+
{Backlog: 0, Level: compressionLevel, RecompressionLevel: compressionLevel},
140+
{Backlog: 21, Level: arbmath.MinInt(compressionLevel, brotli.DefaultCompression), RecompressionLevel: compressionLevel},
141+
{Backlog: 41, Level: arbmath.MinInt(compressionLevel, brotli.DefaultCompression), RecompressionLevel: arbmath.MinInt(compressionLevel, brotli.DefaultCompression)},
142+
{Backlog: 61, Level: arbmath.MinInt(compressionLevel, 4), RecompressionLevel: arbmath.MinInt(compressionLevel, brotli.DefaultCompression)},
143+
}
144+
145+
if err := resolved.Validate(); err != nil {
146+
return nil, fmt.Errorf("invalid compression-levels derived from compression-level: %w", err)
147+
}
148+
return resolved, nil
149+
}

0 commit comments

Comments
 (0)