Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
defaultPort = 0
platform = "platform"
function = "function"
extension = "extension"
Expand Down
59 changes: 38 additions & 21 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"context"
crand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -75,30 +76,52 @@ type telemetryAPIReceiver struct {
logReport bool
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
address := listenOnAddress(r.port)
r.logger.Info("Listening for requests", zap.String("address", address))
func (r *telemetryAPIReceiver) Start(ctx context.Context, _ component.Host) error {
if len(r.types) == 0 {
return fmt.Errorf("no telemetry event types provided")
}
listener, address, err := r.bindListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind listener: %w", err)
}
r.logger.Info("Starting telemetry API listener", zap.String("address", address))

mux := http.NewServeMux()
mux.HandleFunc("/", r.httpHandler)
r.httpServer = &http.Server{Addr: address, Handler: mux}
go func() {
_ = r.httpServer.ListenAndServe()
err := r.httpServer.Serve(listener)
if !errors.Is(err, http.ErrServerClosed) {
r.logger.Error("Telemetry API server stopped unexpectedly", zap.Error(err))
} else {
r.logger.Info("Telemetry API server stopped", zap.String("address", address))
}
}()

telemetryClient := telemetryapi.NewClient(r.logger)
if len(r.types) > 0 {
_, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address))
if err != nil {
r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID))
return err
}
if _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)); err != nil {
r.logger.Error("Failed to subscribe to telemetry", zap.Error(err))
_ = r.Shutdown(ctx)
return err
}
r.logger.Info("Successfully subscribed to telemetry", zap.String("address", address))
return nil
}

func (r *telemetryAPIReceiver) bindListener(ctx context.Context) (net.Listener, string, error) {
listenerAddr := listenOnAddress()
var lc net.ListenConfig
l, err := lc.Listen(ctx, "tcp", fmt.Sprintf("%s:%d", listenerAddr, r.port))
if err != nil {
return nil, "", err
}
addr := fmt.Sprintf("%s:%d", l.Addr().Network(), l.Addr().(*net.TCPAddr).Port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like l.Addr().Network() returns the network type string "tcp". So wouldn't this produce an address like tcp:53421 instead of sandbox.localdomain:53421?

That would mean the subscribe URI passed to the Telemetry API ends up as http://tcp:53421/, which Lambda wouldn't be able to resolve to deliver events to I think?

For reference, the internal listener at internal/telemetryapi/listener.go uses the listenerAddr variable directly for this and I think a lot of the logic we already have there is directly applicable here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this must have been a typo, or I made this change very late at night. I can update this, and add a test, since I'm surprised this managed to slip through.

return l, addr, nil
}

func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error {
return nil
err := r.httpServer.Shutdown(ctx)
return err
}
Comment on lines 122 to 125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again looking at some "prior art" in internal/telemetryapi/listener.go it looks like just to be safe we might want to also nil check r.httpServer here before calling the Shutdown method on it?


func newSpanID() pcommon.SpanID {
Expand Down Expand Up @@ -192,9 +215,6 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
}
}
}

r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len()))
slice = nil
}

func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string {
Expand Down Expand Up @@ -534,14 +554,11 @@ func newTelemetryAPIReceiver(
}, nil
}

func listenOnAddress(port int) string {
func listenOnAddress() string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + strconv.Itoa(port)
return ""
} else {
addr = "sandbox.localdomain:" + strconv.Itoa(port)
return "sandbox.localdomain"
}

return addr
}
8 changes: 4 additions & 4 deletions collector/receiver/telemetryapireceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func TestListenOnAddress(t *testing.T) {
{
desc: "listen on address without AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
addr := listenOnAddress(4325)
require.EqualValues(t, "sandbox.localdomain:4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "sandbox.localdomain", addr)
},
},
{
desc: "listen on address with AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
t.Setenv("AWS_SAM_LOCAL", "true")
addr := listenOnAddress(4325)
require.EqualValues(t, ":4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "", addr)
},
},
}
Expand Down
Loading