Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions internal/impl/io/input_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
hsiFieldResponseStatus = "status"
hsiFieldResponseHeaders = "headers"
hsiFieldResponseExtractMetadata = "metadata_headers"
hsiFieldTLS = "tls"
)

type hsiConfig struct {
Expand All @@ -72,6 +73,7 @@ type hsiConfig struct {
KeyFile string
CORS httpserver.CORSConfig
Response hsiResponseConfig
TLSConfig *tls.Config
}

type hsiResponseConfig struct {
Expand Down Expand Up @@ -128,6 +130,13 @@ func hsiConfigFromParsed(pConf *service.ParsedConfig) (conf hsiConfig, err error
if conf.Response, err = hsiResponseConfigFromParsed(pConf.Namespace(hsiFieldResponse)); err != nil {
return
}
tlsConf, enabled, err := pConf.FieldTLSToggled(hsiFieldTLS)
if err != nil {
return
}
if enabled {
conf.TLSConfig = tlsConf
}
return
}

Expand Down Expand Up @@ -244,14 +253,17 @@ You can access these metadata fields using xref:configuration:interpolation.adoc
service.NewStringField(hsiFieldRateLimit).
Description("An optional xref:components:rate_limits/about.adoc[rate limit] to throttle requests by.").
Default(""),
service.NewServerTLSToggledField(hsiFieldTLS),
service.NewStringField(hsiFieldCertFile).
Description("Enable TLS by specifying a certificate and key file. Only valid with a custom `address`.").
Advanced().
Default(""),
Default("").
Deprecated(),
service.NewStringField(hsiFieldKeyFile).
Description("Enable TLS by specifying a certificate and key file. Only valid with a custom `address`.").
Advanced().
Default(""),
Default("").
Deprecated(),
service.NewInternalField(corsSpec),
service.NewObjectField(hsiFieldResponse,
service.NewInterpolatedStringField(hsiFieldResponseStatus).
Expand Down Expand Up @@ -375,7 +387,10 @@ func newHTTPServerInput(conf hsiConfig, mgr bundle.NewManagement) (input.Streame
var err error
if conf.Address != "" {
gMux = mux.NewRouter()
server = &http.Server{Addr: conf.Address}
server = &http.Server{
Addr: conf.Address,
TLSConfig: conf.TLSConfig,
}
if server.Handler, err = conf.CORS.WrapHandler(gMux); err != nil {
return nil, fmt.Errorf("bad CORS configuration: %w", err)
}
Expand Down Expand Up @@ -877,11 +892,13 @@ func (h *httpServerInput) loop() {

if h.server != nil {
go func() {
if h.conf.KeyFile != "" || h.conf.CertFile != "" {
if h.conf.TLSConfig != nil || h.conf.KeyFile != "" || h.conf.CertFile != "" {
h.log.Info(
"Receiving HTTPS messages at: https://%s\n",
h.conf.Address+h.conf.Path,
)

// if TLSConfig.ClientCertificates are set and CertFile or KeyFile are not empty, the server will use the CertFile and KeyFile instead of the ClientCertificates.
if err := h.server.ListenAndServeTLS(
h.conf.CertFile, h.conf.KeyFile,
); err != http.ErrServerClosed {
Expand Down
131 changes: 130 additions & 1 deletion internal/impl/io/input_http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package io_test
import (
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"io"
"math/big"
"mime"
"mime/multipart"
"net"
"net/http"
"net/http/httptest"
"net/textproto"
"net/url"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1200,7 +1208,6 @@ http_server:
Content-Type: application/json
foo: '${!json("field1")}'
`)

h, err := mgr.NewInput(conf)
require.NoError(t, err)

Expand Down Expand Up @@ -1336,3 +1343,125 @@ http_server:
assert.Equal(t, "200 OK", resp.Status)
assert.Equal(t, "foo", resp.Header.Get("Access-Control-Allow-Origin"))
}

func TestHTTPServerInputTLSParameters(t *testing.T) {
tCtx, done := context.WithTimeout(context.Background(), time.Minute)
defer done()

freePort := getFreePort(t)
certFile, keyFile, caCert, err := createCertFiles()
require.NoError(t, err)
t.Cleanup(func() {
os.Remove(certFile.Name())
os.Remove(keyFile.Name())
})

conf := parseYAMLInputConf(t, `
http_server:
address: 0.0.0.0:%v
path: /test/tls
allowed_verbs: [ POST ]
tls:
enabled: true
server_certs:
- cert_file: %s
key_file: %s
`, freePort, certFile.Name(), keyFile.Name())
server, err := mock.NewManager().NewInput(conf)
require.NoError(t, err)

defer func() {
server.TriggerStopConsuming()
assert.NoError(t, server.WaitForClose(tCtx))
}()

rootCA := x509.NewCertPool()
rootCA.AddCert(caCert)
httpClient := http.DefaultClient
httpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: rootCA,
},
}
var resp *http.Response
inputData := "a bunch of jolly leprechauns await"
go func() {
require.Eventually(t, func() (succeeded bool) {
req, cerr := http.NewRequest(http.MethodPost, fmt.Sprintf("https://localhost:%v/test/tls", freePort), bytes.NewBufferString(inputData))
require.NoError(t, cerr)
req.Header.Set("Content-Type", "text/plain")
if resp, cerr = httpClient.Do(req); cerr == nil {
succeeded = true
assert.Equal(t, "200 OK", resp.Status)
resp.Body.Close()
}
return
}, time.Second, 50*time.Millisecond)
}()

readNextMsg := func() (message.Batch, error) {
var tran message.Transaction
select {
case tran = <-server.TransactionChan():
require.NoError(t, tran.Ack(tCtx, nil))
case <-time.After(time.Second):
return nil, errors.New("timed out")
}
return tran.Payload, nil
}

msg, err := readNextMsg()
require.NoError(t, err)
assert.Equal(t, inputData, string(message.GetAllBytes(msg)[0]))
}

// createCACertificate generates a CA certificate.
func createCertFiles() (certFile, keyFile *os.File, caCert *x509.Certificate, err error) {
caKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, nil, err
}

caTemplate := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "localhost"},
NotBefore: time.Now(),
NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign,
IsCA: true,
BasicConstraintsValid: true,
DNSNames: []string{"localhost"},
}

caCertBytes, err := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &caKey.PublicKey, caKey)
if err != nil {
return nil, nil, nil, err
}

caCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caCertBytes})
caKeyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(caKey)})

caCert, err = x509.ParseCertificate(caCertBytes)
if err != nil {
return nil, nil, nil, err
}

certFile, err = os.CreateTemp("", "ca.pem")
if err != nil {
return nil, nil, nil, err
}
_, err = certFile.Write(caCertPEM)
if err != nil {
return nil, nil, nil, err
}
keyFile, err = os.CreateTemp("", "key.pem")
if err != nil {
return nil, nil, nil, err
}
_, err = keyFile.Write(caKeyPEM)
if err != nil {
return nil, nil, nil, err
}

return certFile, keyFile, caCert, err
}
24 changes: 20 additions & 4 deletions internal/impl/io/output_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -54,6 +55,7 @@ type hsoConfig struct {
CertFile string
KeyFile string
CORS httpserver.CORSConfig
TLSConfig *tls.Config
}

func hsoConfigFromParsed(pConf *service.ParsedConfig) (conf hsoConfig, err error) {
Expand Down Expand Up @@ -95,6 +97,14 @@ func hsoConfigFromParsed(pConf *service.ParsedConfig) (conf hsoConfig, err error
if conf.CORS, err = corsConfigFromParsed(pConf.Namespace(hsoFieldCORS)); err != nil {
return
}

tlsConf, enabled, err := pConf.FieldTLSToggled(hsiFieldTLS)
if err != nil {
return
}
if enabled {
conf.TLSConfig = tlsConf
}
return
}

Expand Down Expand Up @@ -136,14 +146,17 @@ Please note, messages are considered delivered as soon as the data is written to
Description("The maximum time to wait before a blocking, inactive connection is dropped (only applies to the `path` endpoint).").
Default("5s").
Advanced(),
service.NewServerTLSToggledField(hsiFieldTLS),
service.NewStringField(hsoFieldCertFile).
Description("Enable TLS by specifying a certificate and key file. Only valid with a custom `address`.").
Advanced().
Default(""),
Default("").
Deprecated(),
service.NewStringField(hsoFieldKeyFile).
Description("Enable TLS by specifying a certificate and key file. Only valid with a custom `address`.").
Advanced().
Default(""),
Default("").
Deprecated(),
service.NewInternalField(corsSpec),
)
}
Expand Down Expand Up @@ -209,7 +222,10 @@ func newHTTPServerOutput(conf hsoConfig, mgr bundle.NewManagement) (output.Strea
var err error
if conf.Address != "" {
gMux = mux.NewRouter()
server = &http.Server{Addr: conf.Address}
server = &http.Server{
Addr: conf.Address,
TLSConfig: conf.TLSConfig,
}
if server.Handler, err = conf.CORS.WrapHandler(gMux); err != nil {
return nil, fmt.Errorf("bad CORS configuration: %w", err)
}
Expand Down Expand Up @@ -448,7 +464,7 @@ func (h *httpServerOutput) Consume(ts <-chan message.Transaction) error {

if h.server != nil {
go func() {
if h.conf.KeyFile != "" || h.conf.CertFile != "" {
if h.conf.TLSConfig != nil || h.conf.KeyFile != "" || h.conf.CertFile != "" {
h.log.Info(
"Serving messages through HTTPS GET request at: https://%s\n",
h.conf.Address+h.conf.Path,
Expand Down
Loading