Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/config v1.15.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 // indirect
Expand All @@ -22,18 +20,22 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect
github.com/aws/smithy-go v1.11.2 // indirect
github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

require (
github.com/aws/aws-sdk-go-v2 v1.16.2
github.com/aws/aws-sdk-go-v2/config v1.15.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3
github.com/confluentinc/confluent-kafka-go v1.8.2
github.com/google/uuid v1.3.0
github.com/inhies/go-bytesize v0.0.0-20210819104631-275770b98743
github.com/kr/pretty v0.1.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
Expand Down Expand Up @@ -80,12 +81,11 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
Expand Down
33 changes: 21 additions & 12 deletions pkg/plugins/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/file"
pluginTypes "github.com/deepfence/PacketStreamer/pkg/plugins/types"
"github.com/google/uuid"
"log"
)

type KafkaProducer interface {
Expand Down Expand Up @@ -63,18 +63,26 @@ func (p *Plugin) newFile(id string, messageSize int) {
p.CurrentFile.Buffer = append(p.CurrentFile.Buffer, file.Header...)
}

//Start produces Kafka messages containing data that is written to the returned channel
func (p *Plugin) Start(ctx context.Context) chan<- string {
//Start returns a struct which contains a write-only channel to which packet chunks should be written should they wish to be streamed to Kafka.
//Errors produced by this method will be sent on the contained Error channel.
//It is the responsibility of the caller to close the returned Input channel.
//This method will handle closure of the returned Error channel.
func (p *Plugin) Start(ctx context.Context) pluginTypes.RunningPlugin {
inputChan := make(chan string)
errorChan := make(chan error)
go func() {
defer p.Producer.Close()
defer func() {
p.Producer.Close()
close(errorChan)
}()

p.newFile(generateFileId(), p.MessageSize)

for {
select {
case pkt, more := <-inputChan:
if !more {
p.cleanup()
p.cleanup(errorChan)
return
}

Expand All @@ -97,8 +105,7 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
err := p.flush()

if err != nil {
//TODO: handle this better
log.Println(err)
errorChan <- err
return
}

Expand All @@ -110,21 +117,23 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
}
}
case <-ctx.Done():
p.cleanup()
p.cleanup(errorChan)
return
}
}
}()
return inputChan
return pluginTypes.RunningPlugin{
Input: inputChan,
Errors: errorChan,
}
}

func (p *Plugin) cleanup() {
func (p *Plugin) cleanup(errorChan chan error) {
// we only need to clean up if there's actually data to send
if len(p.CurrentFile.Buffer) > len(file.Header) {
err := p.flush()
if err != nil {
//TODO: handle this better
log.Println(err)
errorChan <- err
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/plugins/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func TestPluginStart(t *testing.T) {
CloseChan: make(chan bool),
}

inputChan := plugin.Start(context.TODO())
p := plugin.Start(context.TODO())
{
for _, s := range tt.ToSend {
inputChan <- s
p.Input <- s
}
}
close(inputChan)
close(p.Input)

<-plugin.CloseChan

Expand All @@ -86,7 +86,7 @@ func TestPluginStart(t *testing.T) {

func getFileSizeFromMessages(t *testing.T, sentMessages []string) uint64 {
t.Helper()
var fileSize uint64 = uint64(len(file.Header))
var fileSize = uint64(len(file.Header))

for _, m := range sentMessages {
fileSize += uint64(len(m))
Expand Down
28 changes: 21 additions & 7 deletions pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/plugins/kafka"
"github.com/deepfence/PacketStreamer/pkg/plugins/s3"
"github.com/deepfence/PacketStreamer/pkg/plugins/types"
"log"
)

//Start uses the provided config to start the execution of any plugin outputs that have been defined.
Expand All @@ -15,7 +17,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
return nil, nil
}

var plugins []chan<- string
var plugins []types.RunningPlugin

if config.Output.Plugins.S3 != nil {
s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3)
Expand All @@ -24,8 +26,14 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
return nil, fmt.Errorf("error starting S3 plugin, %v", err)
}

s3Chan := s3plugin.Start(ctx)
plugins = append(plugins, s3Chan)
startedPlugin := s3plugin.Start(ctx)
plugins = append(plugins, startedPlugin)

go func() {
for e := range startedPlugin.Errors {
log.Println(e)
}
}()
}

if config.Output.Plugins.Kafka != nil {
Expand All @@ -35,23 +43,29 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
return nil, fmt.Errorf("error starting Kafka plugin, %v", err)
}

kafkaChan := kafkaPlugin.Start(ctx)
plugins = append(plugins, kafkaChan)
startedPlugin := kafkaPlugin.Start(ctx)
plugins = append(plugins, startedPlugin)

go func() {
for e := range startedPlugin.Errors {
log.Println(e)
}
}()
}

inputChan := make(chan string)
go func() {
defer func() {
for _, p := range plugins {
close(p)
close(p.Input)
}
}()

for {
select {
case pkt := <-inputChan:
for _, p := range plugins {
p <- pkt
p.Input <- pkt
}
case <-ctx.Done():
return
Expand Down
33 changes: 22 additions & 11 deletions pkg/plugins/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/file"
pluginTypes "github.com/deepfence/PacketStreamer/pkg/plugins/types"
"log"
"time"
)
Expand Down Expand Up @@ -73,11 +74,18 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) {
mpu.Buffer = append(mpu.Buffer, data...)
}

//Start returns a write-only channel to which packet chunks should be written should they wish to be streamed to S3.
//It is the responsibility of the caller to close the returned channel.
func (p *Plugin) Start(ctx context.Context) chan<- string {
//Start returns a struct which contains a write-only channel to which packet chunks should be written should they wish to be streamed to S3.
//Errors produced by this method will be sent on the contained Error channel.
//It is the responsibility of the caller to close the returned Input channel.
//This method will handle closure of the returned Error channel.
func (p *Plugin) Start(ctx context.Context) pluginTypes.RunningPlugin {
inputChan := make(chan string)
errorChan := make(chan error)
go func() {
defer func() {
close(errorChan)
}()

payloadMarker := []byte{0x0, 0x0, 0x0, 0x0}
var mpu *MultipartUpload

Expand All @@ -89,14 +97,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
mpu, err = p.createMultipartUpload(ctx)

if err != nil {
log.Printf("error creating multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err)
return
}

mpu.appendToBuffer(file.Header)

if err != nil {
log.Printf("error adding header to buffer, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error adding header to buffer, stopping... - %v\n", err)
return
}
}
Expand All @@ -114,14 +122,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
err := p.completeUpload(ctx, mpu)

if err != nil {
log.Printf("error completing multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error completing multipart upload, stopping... - %v\n", err)
return
}

mpu, err = p.createMultipartUpload(ctx)

if err != nil {
log.Printf("error creating multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err)
return
}

Expand All @@ -132,14 +140,14 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
err := p.completeUpload(ctx, mpu)

if err != nil {
log.Printf("error completing multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error completing multipart upload, stopping... - %v\n", err)
return
}

mpu, err = p.createMultipartUpload(ctx)

if err != nil {
log.Printf("error creating multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err)
return
}
}
Expand All @@ -154,7 +162,7 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
mpu, err = p.createMultipartUpload(ctx)

if err != nil {
log.Printf("error creating multipart upload, stopping... - %v\n", err)
errorChan <- fmt.Errorf("error creating multipart upload, stopping... - %v\n", err)
return
}
case <-ctx.Done():
Expand All @@ -163,7 +171,10 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
}
}
}()
return inputChan
return pluginTypes.RunningPlugin{
Input: inputChan,
Errors: errorChan,
}
}

func (p *Plugin) flushData(ctx context.Context, mpu *MultipartUpload) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/plugins/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package types

type RunningPlugin struct {
Input chan<- string
Errors <-chan error
}