From 576c54fbf632c1164158cad371f626c62f2d2da4 Mon Sep 17 00:00:00 2001 From: Ryan Harvey Date: Mon, 25 Apr 2022 13:44:20 +0100 Subject: [PATCH 1/3] intial cleanup of error handling within plugins --- go.mod | 28 ++++++--------------------- go.sum | 4 ++-- pkg/plugins/kafka/kafka.go | 34 ++++++++++++++++++++------------- pkg/plugins/kafka/kafka_test.go | 8 ++++---- pkg/plugins/plugins.go | 7 ++++--- pkg/plugins/s3/s3.go | 33 +++++++++++++++++++++----------- pkg/plugins/types/types.go | 6 ++++++ 7 files changed, 65 insertions(+), 55 deletions(-) create mode 100644 pkg/plugins/types/types.go diff --git a/go.mod b/go.mod index 766c7ed..0c616d9 100644 --- a/go.mod +++ b/go.mod @@ -10,30 +10,14 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2 v1.16.2 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect - github.com/aws/aws-sdk-go-v2/config v1.15.3 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect - github.com/aws/smithy-go v1.11.2 // indirect - github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect - github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743 // indirect + github.com/aws/aws-sdk-go-v2 v1.16.2 + github.com/aws/aws-sdk-go-v2/config v1.15.3 + github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 + github.com/confluentinc/confluent-kafka-go v1.8.2 + github.com/google/uuid v1.3.0 + github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743 github.com/kr/pretty v0.1.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect - gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 // indirect ) diff --git a/go.sum b/go.sum index 7285727..9794e3d 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4 github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= @@ -80,12 +81,11 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I= -gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= diff --git a/pkg/plugins/kafka/kafka.go b/pkg/plugins/kafka/kafka.go index 3577670..3dcce59 100644 --- a/pkg/plugins/kafka/kafka.go +++ b/pkg/plugins/kafka/kafka.go @@ -6,8 +6,8 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/file" + pluginTypes "github.com/deepfence/PacketStreamer/pkg/plugins/types" "github.com/google/uuid" - "log" ) type KafkaProducer interface { @@ -63,18 +63,26 @@ func (p *Plugin) newFile(id string, messageSize int) { p.CurrentFile.Buffer = append(p.CurrentFile.Buffer, file.Header...) } -//Start produces Kafka messages containing data that is written to the returned channel -func (p *Plugin) Start(ctx context.Context) chan<- string { +//Start returns a struct which contains a write-only channel to which packet chunks should be written should they wish to be streamed to Kafka. +//Errors produced by this method will be sent on the contained Error channel. +//It is the responsibility of the caller to close the returned Input channel. +//This method will handle closure of the returned Error channel. +func (p *Plugin) Start(ctx context.Context) pluginTypes.RunningPlugin { inputChan := make(chan string) + errorChan := make(chan error) go func() { - defer p.Producer.Close() + defer func() { + p.Producer.Close() + close(errorChan) + }() + p.newFile(generateFileId(), p.MessageSize) for { select { case pkt, more := <-inputChan: if !more { - p.cleanup() + p.cleanup(errorChan) return } @@ -97,9 +105,7 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { err := p.flush() if err != nil { - //TODO: handle this better - log.Println(err) - return + errorChan <- err } if p.CurrentFile.Sent >= p.FileSize { @@ -110,21 +116,23 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { } } case <-ctx.Done(): - p.cleanup() + p.cleanup(errorChan) return } } }() - return inputChan + return pluginTypes.RunningPlugin{ + Input: inputChan, + Errors: errorChan, + } } -func (p *Plugin) cleanup() { +func (p *Plugin) cleanup(errorChan chan error) { // we only need to clean up if there's actually data to send if len(p.CurrentFile.Buffer) > len(file.Header) { err := p.flush() if err != nil { - //TODO: handle this better - log.Println(err) + errorChan <- err } } diff --git a/pkg/plugins/kafka/kafka_test.go b/pkg/plugins/kafka/kafka_test.go index 21faf2f..862c678 100644 --- a/pkg/plugins/kafka/kafka_test.go +++ b/pkg/plugins/kafka/kafka_test.go @@ -67,13 +67,13 @@ func TestPluginStart(t *testing.T) { CloseChan: make(chan bool), } - inputChan := plugin.Start(context.TODO()) + p := plugin.Start(context.TODO()) { for _, s := range tt.ToSend { - inputChan <- s + p.Input <- s } } - close(inputChan) + close(p.Input) <-plugin.CloseChan @@ -86,7 +86,7 @@ func TestPluginStart(t *testing.T) { func getFileSizeFromMessages(t *testing.T, sentMessages []string) uint64 { t.Helper() - var fileSize uint64 = uint64(len(file.Header)) + var fileSize = uint64(len(file.Header)) for _, m := range sentMessages { fileSize += uint64(len(m)) diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index d9ac253..0689214 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -6,6 +6,7 @@ import ( "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/plugins/kafka" "github.com/deepfence/PacketStreamer/pkg/plugins/s3" + "github.com/deepfence/PacketStreamer/pkg/plugins/types" ) //Start uses the provided config to start the execution of any plugin outputs that have been defined. @@ -15,7 +16,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { return nil, nil } - var plugins []chan<- string + var plugins []types.RunningPlugin if config.Output.Plugins.S3 != nil { s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3) @@ -43,7 +44,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { go func() { defer func() { for _, p := range plugins { - close(p) + close(p.Input) } }() @@ -51,7 +52,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { select { case pkt := <-inputChan: for _, p := range plugins { - p <- pkt + p.Input <- pkt } case <-ctx.Done(): return diff --git a/pkg/plugins/s3/s3.go b/pkg/plugins/s3/s3.go index 6beb613..a46a24a 100644 --- a/pkg/plugins/s3/s3.go +++ b/pkg/plugins/s3/s3.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/file" + pluginTypes "github.com/deepfence/PacketStreamer/pkg/plugins/types" "log" "time" ) @@ -73,11 +74,18 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) { mpu.Buffer = append(mpu.Buffer, data...) } -//Start returns a write-only channel to which packet chunks should be written should they wish to be streamed to S3. -//It is the responsibility of the caller to close the returned channel. -func (p *Plugin) Start(ctx context.Context) chan<- string { +//Start returns a struct which contains a write-only channel to which packet chunks should be written should they wish to be streamed to S3. +//Errors produced by this method will be sent on the contained Error channel. +//It is the responsibility of the caller to close the returned Input channel. +//This method will handle closure of the returned Error channel. +func (p *Plugin) Start(ctx context.Context) pluginTypes.RunningPlugin { inputChan := make(chan string) + errorChan := make(chan error) go func() { + defer func() { + close(errorChan) + }() + payloadMarker := []byte{0x0, 0x0, 0x0, 0x0} var mpu *MultipartUpload @@ -89,14 +97,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { mpu, err = p.createMultipartUpload(ctx) if err != nil { - log.Printf("error creating multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err) return } mpu.appendToBuffer(file.Header) if err != nil { - log.Printf("error adding header to buffer, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error adding header to buffer, stopping... - %v\n", err) return } } @@ -114,14 +122,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { err := p.completeUpload(ctx, mpu) if err != nil { - log.Printf("error completing multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error completing multipart upload, stopping... - %v\n", err) return } mpu, err = p.createMultipartUpload(ctx) if err != nil { - log.Printf("error creating multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err) return } @@ -132,14 +140,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { err := p.completeUpload(ctx, mpu) if err != nil { - log.Printf("error completing multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error completing multipart upload, stopping... - %v\n", err) return } mpu, err = p.createMultipartUpload(ctx) if err != nil { - log.Printf("error creating multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err) return } } @@ -154,7 +162,7 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { mpu, err = p.createMultipartUpload(ctx) if err != nil { - log.Printf("error creating multipart upload, stopping... - %v\n", err) + errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err) return } case <-ctx.Done(): @@ -163,7 +171,10 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { } } }() - return inputChan + return pluginTypes.RunningPlugin{ + Input: inputChan, + Errors: errorChan, + } } func (p *Plugin) flushData(ctx context.Context, mpu *MultipartUpload) error { diff --git a/pkg/plugins/types/types.go b/pkg/plugins/types/types.go new file mode 100644 index 0000000..6acaef8 --- /dev/null +++ b/pkg/plugins/types/types.go @@ -0,0 +1,6 @@ +package types + +type RunningPlugin struct { + Input chan<- string + Errors <-chan error +} From e37cdf0e5a1bc5d922b4f70c8585327c3c6ed140 Mon Sep 17 00:00:00 2001 From: Ryan Harvey Date: Mon, 25 Apr 2022 23:49:21 +0100 Subject: [PATCH 2/3] go mod tidy --- go.mod | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/go.mod b/go.mod index 0c616d9..a7e3e50 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,24 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect + github.com/aws/smithy-go v1.11.2 // indirect + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect +) + require ( github.com/aws/aws-sdk-go-v2 v1.16.2 github.com/aws/aws-sdk-go-v2/config v1.15.3 From d62cc9d8814acf55d0b7ebf71c5dd24c913fcee8 Mon Sep 17 00:00:00 2001 From: Ryan Harvey Date: Tue, 26 Apr 2022 00:01:37 +0100 Subject: [PATCH 3/3] expose errors outside of plugin goroutines to parent plugin area --- pkg/plugins/kafka/kafka.go | 1 + pkg/plugins/plugins.go | 21 +++++++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/plugins/kafka/kafka.go b/pkg/plugins/kafka/kafka.go index 3dcce59..3b823ed 100644 --- a/pkg/plugins/kafka/kafka.go +++ b/pkg/plugins/kafka/kafka.go @@ -106,6 +106,7 @@ func (p *Plugin) Start(ctx context.Context) pluginTypes.RunningPlugin { if err != nil { errorChan <- err + return } if p.CurrentFile.Sent >= p.FileSize { diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index 0689214..5f0433b 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -7,6 +7,7 @@ import ( "github.com/deepfence/PacketStreamer/pkg/plugins/kafka" "github.com/deepfence/PacketStreamer/pkg/plugins/s3" "github.com/deepfence/PacketStreamer/pkg/plugins/types" + "log" ) //Start uses the provided config to start the execution of any plugin outputs that have been defined. @@ -25,8 +26,14 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { return nil, fmt.Errorf("error starting S3 plugin, %v", err) } - s3Chan := s3plugin.Start(ctx) - plugins = append(plugins, s3Chan) + startedPlugin := s3plugin.Start(ctx) + plugins = append(plugins, startedPlugin) + + go func() { + for e := range startedPlugin.Errors { + log.Println(e) + } + }() } if config.Output.Plugins.Kafka != nil { @@ -36,8 +43,14 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { return nil, fmt.Errorf("error starting Kafka plugin, %v", err) } - kafkaChan := kafkaPlugin.Start(ctx) - plugins = append(plugins, kafkaChan) + startedPlugin := kafkaPlugin.Start(ctx) + plugins = append(plugins, startedPlugin) + + go func() { + for e := range startedPlugin.Errors { + log.Println(e) + } + }() } inputChan := make(chan string)