From 9b10f8aa701f92584a639ce5d75a6f972ba6759c Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 24 Aug 2014 22:04:22 -0700 Subject: [PATCH 01/26] Implementation with race condition --- .gitignore | 1 + Makefile | 3 + listener.go | 75 +++++++++++++++++++---- listener_test.go | 20 +++++++ server.go | 74 ++++++++++++++++++----- server_test.go | 152 +++++++++++++++++++++++++++++++++++++++++++---- 6 files changed, 287 insertions(+), 38 deletions(-) create mode 100644 Makefile create mode 100644 listener_test.go diff --git a/.gitignore b/.gitignore index 88f7359..2261bbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ tmp tags *~ +*.test diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..10975c5 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +cover: + go test -v . -coverprofile=/tmp/coverage.out + go tool cover -html=/tmp/coverage.out diff --git a/listener.go b/listener.go index 3f8b39a..9b950fd 100644 --- a/listener.go +++ b/listener.go @@ -1,9 +1,13 @@ package manners import ( + "fmt" "net" "net/http" - "sync/atomic" + "os" + "reflect" + "sync" + "syscall" ) // NewListener wraps an existing listener for use with @@ -13,7 +17,11 @@ import ( // GracefulServer will automatically wrap any non-graceful listeners // supplied to it. func NewListener(l net.Listener) *GracefulListener { - return &GracefulListener{l, 1} + return &GracefulListener{ + listener: l, + mutex: &sync.RWMutex{}, + open: true, + } } // A gracefulCon wraps a normal net.Conn and tracks the @@ -28,15 +36,26 @@ type gracefulConn struct { // listenerAlreadyClosed error. The GracefulServer will ignore this // error. type GracefulListener struct { - net.Listener - open int32 + listener net.Listener + open bool + mutex *sync.RWMutex +} + +func (l *GracefulListener) isClosed() bool { + l.mutex.RLock() + defer l.mutex.RUnlock() + return !l.open +} + +func (l *GracefulListener) Addr() net.Addr { + return l.listener.Addr() } // Accept implements the Accept method in the Listener interface. func (l *GracefulListener) Accept() (net.Conn, error) { - conn, err := l.Listener.Accept() + conn, err := l.listener.Accept() if err != nil { - if atomic.LoadInt32(&l.open) == 0 { + if l.isClosed() { err = listenerAlreadyClosed{err} } return nil, err @@ -48,11 +67,47 @@ func (l *GracefulListener) Accept() (net.Conn, error) { // Close tells the wrapped listener to stop listening. It is idempotent. func (l *GracefulListener) Close() error { - if atomic.CompareAndSwapInt32(&l.open, 1, 0) { - err := l.Listener.Close() - return err + l.mutex.Lock() + defer l.mutex.Unlock() + if !l.open { + return nil + } + l.open = false + return l.listener.Close() +} + +func (l *GracefulListener) GetFD() (uintptr, string) { + v := reflect.ValueOf(l.listener).Elem().FieldByName("fd").Elem() + fd := uintptr(v.FieldByName("sysfd").Int()) + addr := l.listener.Addr() + name := fmt.Sprintf("%s:%s->", addr.Network(), addr.String()) + return fd, name +} + +func (l *GracefulListener) Clone() (*GracefulListener, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.open { + return nil, fmt.Errorf("listener is already closed") + } + + fd, fdName := l.GetFD() + + fl, err := net.FileListener(os.NewFile(fd, fdName)) + if nil != err { + return nil, err + } + + switch fl.(type) { + case *net.TCPListener, *net.UnixListener: + default: + return nil, fmt.Errorf("file descriptor is %T not *net.TCPListener or *net.UnixListener", l) + } + if err := syscall.Close(int(fd)); nil != err { + return nil, err } - return nil + return NewListener(fl), nil } type listenerAlreadyClosed struct { diff --git a/listener_test.go b/listener_test.go new file mode 100644 index 0000000..62530b1 --- /dev/null +++ b/listener_test.go @@ -0,0 +1,20 @@ +package manners + +import ( + "net" + "testing" +) + +func TestListenerGetFD(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("Failed to create a listener", err) + } + g := NewListener(l) + fd, _ := g.GetFD() + if fd == 0 { + t.Fatal("Failed to get and FD", fd) + } + + g.Close() +} diff --git a/server.go b/server.go index af1755b..bba334c 100644 --- a/server.go +++ b/server.go @@ -43,6 +43,7 @@ package manners import ( "crypto/tls" + "fmt" "net" "net/http" "sync" @@ -84,12 +85,15 @@ func NewWithServer(s *http.Server) *GracefulServer { // // It must be initialized by calling NewServer or NewWithServer type GracefulServer struct { + name string *http.Server shutdown chan struct{} wg waitgroup // used by test code - up chan net.Listener + up chan net.Listener + down chan bool + listener *GracefulListener } // Close stops the server from accepting new requets and beings shutting down. @@ -99,20 +103,18 @@ func (s *GracefulServer) Close() { // ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe. func (s *GracefulServer) ListenAndServe() error { - oldListener, err := net.Listen("tcp", s.Addr) - if err != nil { - return err + if s.listener == nil { + oldListener, err := net.Listen("tcp", s.Addr) + if err != nil { + return err + } + s.listener = NewListener(oldListener.(*net.TCPListener)) } - - //listener := NewListener(tcpKeepAliveListener{oldListener.(*net.TCPListener)}) - listener := NewListener(oldListener.(*net.TCPListener)) - err = s.Serve(listener) - return err + return s.Serve(s.listener) } // ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { - // direct lift from net/http/server.go addr := s.Addr if addr == "" { addr = ":https" @@ -132,14 +134,38 @@ func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { return err } - ln, err := net.Listen("tcp", addr) - if err != nil { - return err + return s.ListenAndServeTLSWithConfig(config) +} + +// ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. +func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { + addr := s.Addr + if addr == "" { + addr = ":https" } - tlsListener := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, config) - return s.Serve(NewListener(tlsListener)) + if s.listener == nil { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + tlsListener := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, config) + s.listener = NewListener(tlsListener) + + } + return s.Serve(s.listener) +} +func (gs *GracefulServer) HijackListener(s *http.Server) (*GracefulServer, error) { + listener, err := gs.listener.Clone() + if err != nil { + return nil, err + } + fmt.Println("Cloned") + other := NewWithServer(s) + other.listener = listener + return other, nil } // Serve provides a graceful equivalent net/http.Server.Serve. @@ -165,7 +191,6 @@ func (s *GracefulServer) Serve(listener net.Listener) error { orgConnState := s.Server.ConnState s.ConnState = func(conn net.Conn, newState http.ConnState) { gconn := conn.(*gracefulConn) - //fmt.Printf("%p %s -> %s\n", gconn, gconn.lastHTTPState, newState) switch newState { case http.StateNew: // new_conn -> StateNew @@ -185,12 +210,14 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // one more request before SetKeepAliveEnabled(false) takes effect. conn.Close() } + fmt.Printf("Connection idle\n") s.FinishRoutine() case http.StateClosed, http.StateHijacked: // (StateNew, StateActive, StateIdle) -> (StateClosed, StateHiJacked) if gconn.lastHTTPState != http.StateIdle { // if it was idle it's already been decremented + fmt.Printf("Connection closed\n") s.FinishRoutine() } } @@ -206,6 +233,15 @@ func (s *GracefulServer) Serve(listener net.Listener) error { s.up <- listener } err := s.Server.Serve(listener) + if s.down != nil { + defer func() { + close(s.down) + }() + } + + defer func() { + fmt.Printf("Server(%s) stopped. Error: %T, %s\n", s.name, err, err) + }() // This block is reached when the server has received a shut down command. if err == nil { @@ -223,11 +259,17 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // request. func (s *GracefulServer) StartRoutine() { s.wg.Add(1) + fmt.Printf("Server(%s) StartRoutine()\n", s.name) } // FinishRoutine decrements the server's WaitGroup. Used this to complement StartRoutine(). func (s *GracefulServer) FinishRoutine() { s.wg.Done() + fmt.Printf("Server(%s) FinishRoutine()\n", s.name) +} + +func (s *GracefulServer) GetFD() (uintptr, string) { + return s.listener.GetFD() } var ( diff --git a/server_test.go b/server_test.go index 3839273..19ddb64 100644 --- a/server_test.go +++ b/server_test.go @@ -23,37 +23,46 @@ type httpInterface interface { // an inefficient replica of a waitgroup that can be introspected type testWg struct { + name string sync.Mutex - count int - waitCalled chan int + count int + waitCalled chan int + countChanged chan int } func newTestWg() *testWg { return &testWg{ - waitCalled: make(chan int, 1), + waitCalled: make(chan int, 1), + countChanged: make(chan int, 100), } } func (wg *testWg) Add(delta int) { wg.Lock() wg.count++ + fmt.Printf("WG(%s)-Add: %d\n", wg.name, wg.count) + wg.countChanged <- wg.count wg.Unlock() } func (wg *testWg) Done() { wg.Lock() wg.count-- + wg.countChanged <- wg.count + fmt.Printf("WG(%s)-Done: %d\n", wg.name, wg.count) wg.Unlock() } func (wg *testWg) Wait() { wg.Lock() + fmt.Printf("WG(%s)-Wait: %d\n", wg.name, wg.count) wg.waitCalled <- wg.count wg.Unlock() } // a simple step-controllable http client type client struct { + name string tls bool addr net.Addr connected chan error @@ -66,8 +75,11 @@ type client struct { func (c *client) Run() { go func() { var err error + fmt.Printf("Client(%s) before Dial\n", c.name) conn, err := net.Dial(c.addr.Network(), c.addr.String()) + fmt.Printf("Client(%s) after dial\n", c.name) if err != nil { + fmt.Println("Client(%s) connected with error %s\n", conn, err) c.connected <- err return } @@ -75,6 +87,7 @@ func (c *client) Run() { conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true}) } c.connected <- nil + fmt.Println("Client(%s) connected successfully\n", c.name) for <-c.sendrequest { _, err = conn.Write([]byte("GET / HTTP/1.1\nHost: localhost:8000\n\n")) if err != nil { @@ -89,8 +102,10 @@ func (c *client) Run() { break } } + fmt.Printf("Client(%s) got response\n", c.name) c.idle <- scanner.Err() <-c.idlerelease + fmt.Printf("Client(%s) idle released\n", c.name) } conn.Close() ioutil.ReadAll(conn) @@ -166,9 +181,9 @@ func startTLSServer(t *testing.T, server *GracefulServer, certFile, keyFile stri return startGenericServer(t, server, statechanged, runner) } -// Test that the method signatures of the methods we override from net/http/Server -// match those of the original. +// Test that the method signatures of the methods we override from net/http/Server match those of the original. func TestInterface(t *testing.T) { + return var original, ours interface{} original = &http.Server{} ours = &GracefulServer{} @@ -180,9 +195,9 @@ func TestInterface(t *testing.T) { } } -// Tests that the server allows in-flight requests to complete -// before shutting down. +// Tests that the server allows in-flight requests to complete before shutting down. func TestGracefulness(t *testing.T) { + return server := NewServer() wg := newTestWg() server.wg = wg @@ -237,6 +252,7 @@ func fmtstates(states []http.ConnState) string { // Test the state machine in isolation without a network connection func TestStateTransitions(t *testing.T) { + return for _, test := range stateTests { fmt.Println("Starting test ", fmtstates(test.states)) server := NewServer() @@ -291,9 +307,9 @@ func (l *fakeListener) Accept() (net.Conn, error) { return nil, errors.New("connection closed") } -// Test that a connection is closed upon reaching an idle state iff the server -// is shutting down. +// Test that a connection is closed upon reaching an idle state iff the server is shutting down. func TestCloseOnIdle(t *testing.T) { + return server := NewServer() wg := newTestWg() server.wg = wg @@ -349,9 +365,9 @@ func waitForState(t *testing.T, waiter chan http.ConnState, state http.ConnState } } -// Test that a request moving from active->idle->active using an actual -// network connection still results in a corect shutdown +// Test that a request moving from active->idle->active using an actual network connection still results in a corect shutdown. func TestStateTransitionActiveIdleActive(t *testing.T) { + return server := NewServer() wg := newTestWg() statechanged := make(chan http.ConnState) @@ -390,6 +406,7 @@ func TestStateTransitionActiveIdleActive(t *testing.T) { // Test state transitions from new->active->-idle->closed using an actual // network connection and make sure the waitgroup count is correct at the end. func TestStateTransitionActiveIdleClosed(t *testing.T) { + return var ( listener net.Listener exitchan chan error @@ -454,6 +471,7 @@ func TestStateTransitionActiveIdleClosed(t *testing.T) { // Test that supplying a non GracefulListener to Serve works // correctly (ie. that the listener is wrapped to become graceful) func TestWrapConnection(t *testing.T) { + return l, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatal("Failed to create listener", err) @@ -498,7 +516,7 @@ func TestWrapConnection(t *testing.T) { // Tests that the server begins to shut down when told to and does not accept // new requests once shutdown has begun func TestShutdown(t *testing.T) { - + return server := NewServer() wg := newTestWg() server.wg = wg @@ -538,6 +556,7 @@ func TestShutdown(t *testing.T) { // Use the top level functions to instantiate servers and make sure // they all shutdown when Close() is called func TestGlobalShutdown(t *testing.T) { + return laserr := make(chan error) lastlserr := make(chan error) serveerr := make(chan error) @@ -597,6 +616,115 @@ func TestGlobalShutdown(t *testing.T) { } +// Hijack listener +func TestHijackListener(t *testing.T) { + server := NewServer() + server.name = "s1" + wg := newTestWg() + wg.name = "wg" + server.wg = wg + server.down = make(chan bool) + listener, exitchan := startServer(t, server, nil) + + client := newClient(listener.Addr(), false) + client.name = "c1" + client.Run() + + // wait for client to connect, but don't let it send the request yet + if err := <-client.connected; err != nil { + t.Fatal("Client failed to connect to server", err) + } + + // Make sure first server got the request + if count := <-wg.countChanged; count != 1 { + t.Fatal("Expected first server to accept the request") + } + + fmt.Println("Before Hijack") + + wg2 := newTestWg() + wg2.name = "wg2" + server2, err := server.HijackListener(new(http.Server)) + server2.name = "s2" + server2.wg = wg2 + if err != nil { + t.Fatal("Failed to hijack listener", err) + } + + fmt.Println("After Hijack") + + listener2, exitchan2 := startServer(t, server2, nil) + + // Close the first server + server.Close() + + fmt.Println("Close server") + // First server waits for the first request to finish + waiting := <-wg.waitCalled + if waiting < 1 { + t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) + } + fmt.Println("Server1 waits") + + // allow the client to finish sending the request and make sure the server exits after + // (client will be in connected but idle state at that point) + client.sendrequest <- true + close(client.sendrequest) + if err := <-exitchan; err != nil { + t.Error("Unexpected error during shutdown", err) + } + fmt.Println("Client 1 exited") + + // Make sure server1 has been closed + <-server.down + + fmt.Println("Server 1 exited") + + client2 := newClient(listener2.Addr(), false) + client2.name = "c2" + client2.Run() + + fmt.Println("Checkpoint 1") + + // wait for client to connect, but don't let it send the request yet + if err := <-client2.connected; err != nil { + t.Fatal("Client failed to connect to server", err) + } + + fmt.Println("Checkpoint 1.1") + + if count := <-wg2.countChanged; count != 1 { + t.Fatal("Expected first server to accept the request") + } + + fmt.Println("Checkpoint 1.2") + + fmt.Println("Checkpoint 2") + // Close the second server + server2.Close() + + waiting = <-wg2.waitCalled + if waiting < 1 { + t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) + } + + fmt.Println("Checkpoint 3") + + // allow the client to finish sending the request and make sure the server exits after + // (client will be in connected but idle state at that point) + client2.sendrequest <- true + // Make sure that request resulted in success + if err := <-client2.idle; err != nil { + t.Errorf("Client failed to write the request, error: %s", err) + } + fmt.Println("Checkpoint 4") + close(client2.sendrequest) + if err := <-exitchan2; err != nil { + t.Error("Unexpected error during shutdown", err) + } + fmt.Println("Checkpoint 5") +} + type tempFile struct { *os.File } From 42e5572877738b425451a651a30081b76a8bd5f1 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Tue, 26 Aug 2014 18:21:56 -0700 Subject: [PATCH 02/26] Fix corrupted socket --- listener.go | 38 +++++++++++--------------- listener_test.go | 20 -------------- server.go | 25 ++--------------- server_test.go | 71 +++++++----------------------------------------- 4 files changed, 28 insertions(+), 126 deletions(-) delete mode 100644 listener_test.go diff --git a/listener.go b/listener.go index 9b950fd..05c71ba 100644 --- a/listener.go +++ b/listener.go @@ -5,9 +5,7 @@ import ( "net" "net/http" "os" - "reflect" "sync" - "syscall" ) // NewListener wraps an existing listener for use with @@ -24,8 +22,7 @@ func NewListener(l net.Listener) *GracefulListener { } } -// A gracefulCon wraps a normal net.Conn and tracks the -// last known http state. +// A gracefulCon wraps a normal net.Conn and tracks the last known http state. type gracefulConn struct { net.Conn lastHTTPState http.ConnState @@ -33,8 +30,7 @@ type gracefulConn struct { // A GracefulListener differs from a standard net.Listener in one way: if // Accept() is called after it is gracefully closed, it returns a -// listenerAlreadyClosed error. The GracefulServer will ignore this -// error. +// listenerAlreadyClosed error. The GracefulServer will ignore this error. type GracefulListener struct { listener net.Listener open bool @@ -76,12 +72,14 @@ func (l *GracefulListener) Close() error { return l.listener.Close() } -func (l *GracefulListener) GetFD() (uintptr, string) { - v := reflect.ValueOf(l.listener).Elem().FieldByName("fd").Elem() - fd := uintptr(v.FieldByName("sysfd").Int()) - addr := l.listener.Addr() - name := fmt.Sprintf("%s:%s->", addr.Network(), addr.String()) - return fd, name +func (l *GracefulListener) GetFile() (*os.File, error) { + switch t := l.listener.(type) { + case *net.TCPListener: + return t.File() + case *net.UnixListener: + return t.File() + } + return nil, fmt.Errorf("Unsupported listener") } func (l *GracefulListener) Clone() (*GracefulListener, error) { @@ -92,21 +90,17 @@ func (l *GracefulListener) Clone() (*GracefulListener, error) { return nil, fmt.Errorf("listener is already closed") } - fd, fdName := l.GetFD() - - fl, err := net.FileListener(os.NewFile(fd, fdName)) - if nil != err { + file, err := l.GetFile() + if err != nil { return nil, err } + defer file.Close() - switch fl.(type) { - case *net.TCPListener, *net.UnixListener: - default: - return nil, fmt.Errorf("file descriptor is %T not *net.TCPListener or *net.UnixListener", l) - } - if err := syscall.Close(int(fd)); nil != err { + fl, err := net.FileListener(file) + if nil != err { return nil, err } + return NewListener(fl), nil } diff --git a/listener_test.go b/listener_test.go deleted file mode 100644 index 62530b1..0000000 --- a/listener_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package manners - -import ( - "net" - "testing" -) - -func TestListenerGetFD(t *testing.T) { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal("Failed to create a listener", err) - } - g := NewListener(l) - fd, _ := g.GetFD() - if fd == 0 { - t.Fatal("Failed to get and FD", fd) - } - - g.Close() -} diff --git a/server.go b/server.go index bba334c..a84f136 100644 --- a/server.go +++ b/server.go @@ -43,7 +43,6 @@ package manners import ( "crypto/tls" - "fmt" "net" "net/http" "sync" @@ -85,15 +84,13 @@ func NewWithServer(s *http.Server) *GracefulServer { // // It must be initialized by calling NewServer or NewWithServer type GracefulServer struct { - name string *http.Server shutdown chan struct{} wg waitgroup + listener *GracefulListener // used by test code - up chan net.Listener - down chan bool - listener *GracefulListener + up chan net.Listener } // Close stops the server from accepting new requets and beings shutting down. @@ -162,7 +159,6 @@ func (gs *GracefulServer) HijackListener(s *http.Server) (*GracefulServer, error if err != nil { return nil, err } - fmt.Println("Cloned") other := NewWithServer(s) other.listener = listener return other, nil @@ -210,14 +206,12 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // one more request before SetKeepAliveEnabled(false) takes effect. conn.Close() } - fmt.Printf("Connection idle\n") s.FinishRoutine() case http.StateClosed, http.StateHijacked: // (StateNew, StateActive, StateIdle) -> (StateClosed, StateHiJacked) if gconn.lastHTTPState != http.StateIdle { // if it was idle it's already been decremented - fmt.Printf("Connection closed\n") s.FinishRoutine() } } @@ -233,15 +227,6 @@ func (s *GracefulServer) Serve(listener net.Listener) error { s.up <- listener } err := s.Server.Serve(listener) - if s.down != nil { - defer func() { - close(s.down) - }() - } - - defer func() { - fmt.Printf("Server(%s) stopped. Error: %T, %s\n", s.name, err, err) - }() // This block is reached when the server has received a shut down command. if err == nil { @@ -259,17 +244,11 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // request. func (s *GracefulServer) StartRoutine() { s.wg.Add(1) - fmt.Printf("Server(%s) StartRoutine()\n", s.name) } // FinishRoutine decrements the server's WaitGroup. Used this to complement StartRoutine(). func (s *GracefulServer) FinishRoutine() { s.wg.Done() - fmt.Printf("Server(%s) FinishRoutine()\n", s.name) -} - -func (s *GracefulServer) GetFD() (uintptr, string) { - return s.listener.GetFD() } var ( diff --git a/server_test.go b/server_test.go index 19ddb64..f401e6f 100644 --- a/server_test.go +++ b/server_test.go @@ -23,7 +23,6 @@ type httpInterface interface { // an inefficient replica of a waitgroup that can be introspected type testWg struct { - name string sync.Mutex count int waitCalled chan int @@ -33,14 +32,13 @@ type testWg struct { func newTestWg() *testWg { return &testWg{ waitCalled: make(chan int, 1), - countChanged: make(chan int, 100), + countChanged: make(chan int, 1024), } } func (wg *testWg) Add(delta int) { wg.Lock() wg.count++ - fmt.Printf("WG(%s)-Add: %d\n", wg.name, wg.count) wg.countChanged <- wg.count wg.Unlock() } @@ -49,20 +47,17 @@ func (wg *testWg) Done() { wg.Lock() wg.count-- wg.countChanged <- wg.count - fmt.Printf("WG(%s)-Done: %d\n", wg.name, wg.count) wg.Unlock() } func (wg *testWg) Wait() { wg.Lock() - fmt.Printf("WG(%s)-Wait: %d\n", wg.name, wg.count) wg.waitCalled <- wg.count wg.Unlock() } // a simple step-controllable http client type client struct { - name string tls bool addr net.Addr connected chan error @@ -75,11 +70,8 @@ type client struct { func (c *client) Run() { go func() { var err error - fmt.Printf("Client(%s) before Dial\n", c.name) conn, err := net.Dial(c.addr.Network(), c.addr.String()) - fmt.Printf("Client(%s) after dial\n", c.name) if err != nil { - fmt.Println("Client(%s) connected with error %s\n", conn, err) c.connected <- err return } @@ -87,7 +79,6 @@ func (c *client) Run() { conn = tls.Client(conn, &tls.Config{InsecureSkipVerify: true}) } c.connected <- nil - fmt.Println("Client(%s) connected successfully\n", c.name) for <-c.sendrequest { _, err = conn.Write([]byte("GET / HTTP/1.1\nHost: localhost:8000\n\n")) if err != nil { @@ -102,10 +93,8 @@ func (c *client) Run() { break } } - fmt.Printf("Client(%s) got response\n", c.name) c.idle <- scanner.Err() <-c.idlerelease - fmt.Printf("Client(%s) idle released\n", c.name) } conn.Close() ioutil.ReadAll(conn) @@ -183,7 +172,6 @@ func startTLSServer(t *testing.T, server *GracefulServer, certFile, keyFile stri // Test that the method signatures of the methods we override from net/http/Server match those of the original. func TestInterface(t *testing.T) { - return var original, ours interface{} original = &http.Server{} ours = &GracefulServer{} @@ -197,7 +185,6 @@ func TestInterface(t *testing.T) { // Tests that the server allows in-flight requests to complete before shutting down. func TestGracefulness(t *testing.T) { - return server := NewServer() wg := newTestWg() server.wg = wg @@ -252,7 +239,6 @@ func fmtstates(states []http.ConnState) string { // Test the state machine in isolation without a network connection func TestStateTransitions(t *testing.T) { - return for _, test := range stateTests { fmt.Println("Starting test ", fmtstates(test.states)) server := NewServer() @@ -309,7 +295,6 @@ func (l *fakeListener) Accept() (net.Conn, error) { // Test that a connection is closed upon reaching an idle state iff the server is shutting down. func TestCloseOnIdle(t *testing.T) { - return server := NewServer() wg := newTestWg() server.wg = wg @@ -367,7 +352,6 @@ func waitForState(t *testing.T, waiter chan http.ConnState, state http.ConnState // Test that a request moving from active->idle->active using an actual network connection still results in a corect shutdown. func TestStateTransitionActiveIdleActive(t *testing.T) { - return server := NewServer() wg := newTestWg() statechanged := make(chan http.ConnState) @@ -406,7 +390,6 @@ func TestStateTransitionActiveIdleActive(t *testing.T) { // Test state transitions from new->active->-idle->closed using an actual // network connection and make sure the waitgroup count is correct at the end. func TestStateTransitionActiveIdleClosed(t *testing.T) { - return var ( listener net.Listener exitchan chan error @@ -471,7 +454,6 @@ func TestStateTransitionActiveIdleClosed(t *testing.T) { // Test that supplying a non GracefulListener to Serve works // correctly (ie. that the listener is wrapped to become graceful) func TestWrapConnection(t *testing.T) { - return l, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatal("Failed to create listener", err) @@ -516,7 +498,6 @@ func TestWrapConnection(t *testing.T) { // Tests that the server begins to shut down when told to and does not accept // new requests once shutdown has begun func TestShutdown(t *testing.T) { - return server := NewServer() wg := newTestWg() server.wg = wg @@ -556,7 +537,6 @@ func TestShutdown(t *testing.T) { // Use the top level functions to instantiate servers and make sure // they all shutdown when Close() is called func TestGlobalShutdown(t *testing.T) { - return laserr := make(chan error) lastlserr := make(chan error) serveerr := make(chan error) @@ -619,15 +599,11 @@ func TestGlobalShutdown(t *testing.T) { // Hijack listener func TestHijackListener(t *testing.T) { server := NewServer() - server.name = "s1" wg := newTestWg() - wg.name = "wg" server.wg = wg - server.down = make(chan bool) listener, exitchan := startServer(t, server, nil) client := newClient(listener.Addr(), false) - client.name = "c1" client.Run() // wait for client to connect, but don't let it send the request yet @@ -635,36 +611,26 @@ func TestHijackListener(t *testing.T) { t.Fatal("Client failed to connect to server", err) } - // Make sure first server got the request - if count := <-wg.countChanged; count != 1 { - t.Fatal("Expected first server to accept the request") - } - - fmt.Println("Before Hijack") + // Make sure server1 got the request and added it to the waiting group + <-wg.countChanged wg2 := newTestWg() - wg2.name = "wg2" server2, err := server.HijackListener(new(http.Server)) - server2.name = "s2" server2.wg = wg2 if err != nil { t.Fatal("Failed to hijack listener", err) } - fmt.Println("After Hijack") - listener2, exitchan2 := startServer(t, server2, nil) // Close the first server server.Close() - fmt.Println("Close server") // First server waits for the first request to finish waiting := <-wg.waitCalled if waiting < 1 { t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) } - fmt.Println("Server1 waits") // allow the client to finish sending the request and make sure the server exits after // (client will be in connected but idle state at that point) @@ -673,33 +639,20 @@ func TestHijackListener(t *testing.T) { if err := <-exitchan; err != nil { t.Error("Unexpected error during shutdown", err) } - fmt.Println("Client 1 exited") - - // Make sure server1 has been closed - <-server.down - - fmt.Println("Server 1 exited") client2 := newClient(listener2.Addr(), false) - client2.name = "c2" client2.Run() - fmt.Println("Checkpoint 1") - // wait for client to connect, but don't let it send the request yet - if err := <-client2.connected; err != nil { - t.Fatal("Client failed to connect to server", err) - } - - fmt.Println("Checkpoint 1.1") - - if count := <-wg2.countChanged; count != 1 { - t.Fatal("Expected first server to accept the request") + select { + case err := <-client2.connected: + if err != nil { + t.Fatal("Client failed to connect to server", err) + } + case <-time.After(time.Second): + t.Fatal("Timeout connecting to the server", err) } - fmt.Println("Checkpoint 1.2") - - fmt.Println("Checkpoint 2") // Close the second server server2.Close() @@ -708,8 +661,6 @@ func TestHijackListener(t *testing.T) { t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) } - fmt.Println("Checkpoint 3") - // allow the client to finish sending the request and make sure the server exits after // (client will be in connected but idle state at that point) client2.sendrequest <- true @@ -717,12 +668,10 @@ func TestHijackListener(t *testing.T) { if err := <-client2.idle; err != nil { t.Errorf("Client failed to write the request, error: %s", err) } - fmt.Println("Checkpoint 4") close(client2.sendrequest) if err := <-exitchan2; err != nil { t.Error("Unexpected error during shutdown", err) } - fmt.Println("Checkpoint 5") } type tempFile struct { From 700effe0a4091755493d8509f5d5f375bd607c0d Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 31 Aug 2014 13:33:11 -0700 Subject: [PATCH 03/26] Expose keep alive listener --- server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index a84f136..1d23a1e 100644 --- a/server.go +++ b/server.go @@ -147,7 +147,7 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { return err } - tlsListener := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, config) + tlsListener := tls.NewListener(TCPKeepAliveListener{ln.(*net.TCPListener)}, config) s.listener = NewListener(tlsListener) } @@ -293,17 +293,17 @@ func Close() { m.Unlock() } -// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted // connections. It's used by ListenAndServe and ListenAndServeTLS so // dead TCP connections (e.g. closing laptop mid-download) eventually // go away. // // direct lift from net/http/server.go -type tcpKeepAliveListener struct { +type TCPKeepAliveListener struct { *net.TCPListener } -func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { +func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { tc, err := ln.AcceptTCP() if err != nil { return From 1d2683230c0a078990c4d0ed0af8c57519049f7e Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 31 Aug 2014 13:57:44 -0700 Subject: [PATCH 04/26] Add support for TLS updates --- listener.go | 5 ++--- server.go | 9 +++++++-- server_test.go | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/listener.go b/listener.go index 05c71ba..ed8b22a 100644 --- a/listener.go +++ b/listener.go @@ -82,7 +82,7 @@ func (l *GracefulListener) GetFile() (*os.File, error) { return nil, fmt.Errorf("Unsupported listener") } -func (l *GracefulListener) Clone() (*GracefulListener, error) { +func (l *GracefulListener) Clone() (net.Listener, error) { l.mutex.Lock() defer l.mutex.Unlock() @@ -100,8 +100,7 @@ func (l *GracefulListener) Clone() (*GracefulListener, error) { if nil != err { return nil, err } - - return NewListener(fl), nil + return fl, nil } type listenerAlreadyClosed struct { diff --git a/server.go b/server.go index 1d23a1e..070470d 100644 --- a/server.go +++ b/server.go @@ -154,13 +154,18 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { return s.Serve(s.listener) } -func (gs *GracefulServer) HijackListener(s *http.Server) (*GracefulServer, error) { +func (gs *GracefulServer) HijackListener(s *http.Server, config *tls.Config) (*GracefulServer, error) { listener, err := gs.listener.Clone() if err != nil { return nil, err } + + if config != nil { + listener = tls.NewListener(TCPKeepAliveListener{listener.(*net.TCPListener)}, config) + } + other := NewWithServer(s) - other.listener = listener + other.listener = NewListener(listener) return other, nil } diff --git a/server_test.go b/server_test.go index f401e6f..0ee9fee 100644 --- a/server_test.go +++ b/server_test.go @@ -615,7 +615,7 @@ func TestHijackListener(t *testing.T) { <-wg.countChanged wg2 := newTestWg() - server2, err := server.HijackListener(new(http.Server)) + server2, err := server.HijackListener(new(http.Server), nil) server2.wg = wg2 if err != nil { t.Fatal("Failed to hijack listener", err) From 463f1fedbcdedb8d3df2c2bc17224c2cc2c007f4 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Wed, 3 Sep 2014 22:18:32 -0700 Subject: [PATCH 05/26] Always preserve listener --- server.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 070470d..192bc22 100644 --- a/server.go +++ b/server.go @@ -176,9 +176,12 @@ func (gs *GracefulServer) HijackListener(s *http.Server, config *tls.Config) (*G func (s *GracefulServer) Serve(listener net.Listener) error { // accept a net.Listener to preserve the interface compatibility with the standard // http.Server, but we except a GracefluListener - if _, ok := listener.(*GracefulListener); !ok { - listener = NewListener(listener) + gracefulListener, ok := listener.(*GracefulListener) + if !ok { + gracefulListener = NewListener(listener) + listener = gracefulListener } + s.listener = gracefulListener var closing int32 From 4213b9dc0f22d6bc07e67af77b205d40abf33f4a Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Thu, 4 Sep 2014 22:20:47 -0700 Subject: [PATCH 06/26] Support hijacking of TLS listeners --- listener.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++++----- server.go | 26 ++----------------- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/listener.go b/listener.go index ed8b22a..e729fd5 100644 --- a/listener.go +++ b/listener.go @@ -1,11 +1,13 @@ package manners import ( + "crypto/tls" "fmt" "net" "net/http" "os" "sync" + "time" ) // NewListener wraps an existing listener for use with @@ -73,13 +75,7 @@ func (l *GracefulListener) Close() error { } func (l *GracefulListener) GetFile() (*os.File, error) { - switch t := l.listener.(type) { - case *net.TCPListener: - return t.File() - case *net.UnixListener: - return t.File() - } - return nil, fmt.Errorf("Unsupported listener") + return getListenerFile(l.listener) } func (l *GracefulListener) Clone() (net.Listener, error) { @@ -103,6 +99,69 @@ func (l *GracefulListener) Clone() (net.Listener, error) { return fl, nil } +// A listener implements a network listener (net.Listener) for TLS connections. +// direct lift from crypto/tls.go +type TLSListener struct { + net.Listener + config *tls.Config +} + +// Accept waits for and returns the next incoming TLS connection. +// The returned connection c is a *tls.Conn. +func (l *TLSListener) Accept() (c net.Conn, err error) { + c, err = l.Listener.Accept() + if err != nil { + return + } + c = tls.Server(c, l.config) + return +} + +// NewListener creates a Listener which accepts connections from an inner +// Listener and wraps each connection with Server. +// The configuration config must be non-nil and must have +// at least one certificate. +func NewTLSListener(inner net.Listener, config *tls.Config) net.Listener { + l := new(TLSListener) + l.Listener = inner + l.config = config + return l +} + type listenerAlreadyClosed struct { error } + +// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by ListenAndServe and ListenAndServeTLS so +// dead TCP connections (e.g. closing laptop mid-download) eventually +// go away. +// +// direct lift from net/http/server.go +type TCPKeepAliveListener struct { + *net.TCPListener +} + +func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil +} + +func getListenerFile(listener net.Listener) (*os.File, error) { + switch t := listener.(type) { + case *net.TCPListener: + return t.File() + case *net.UnixListener: + return t.File() + case TCPKeepAliveListener: + return t.TCPListener.File() + case *TLSListener: + return getListenerFile(t.Listener) + } + return nil, fmt.Errorf("Unsupported listener: %T", listener) +} diff --git a/server.go b/server.go index 192bc22..baa4fec 100644 --- a/server.go +++ b/server.go @@ -47,7 +47,6 @@ import ( "net/http" "sync" "sync/atomic" - "time" ) // interface describing a waitgroup, so unit @@ -147,9 +146,8 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { return err } - tlsListener := tls.NewListener(TCPKeepAliveListener{ln.(*net.TCPListener)}, config) + tlsListener := NewTLSListener(TCPKeepAliveListener{ln.(*net.TCPListener)}, config) s.listener = NewListener(tlsListener) - } return s.Serve(s.listener) } @@ -161,7 +159,7 @@ func (gs *GracefulServer) HijackListener(s *http.Server, config *tls.Config) (*G } if config != nil { - listener = tls.NewListener(TCPKeepAliveListener{listener.(*net.TCPListener)}, config) + listener = NewTLSListener(TCPKeepAliveListener{listener.(*net.TCPListener)}, config) } other := NewWithServer(s) @@ -300,23 +298,3 @@ func Close() { servers = nil m.Unlock() } - -// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted -// connections. It's used by ListenAndServe and ListenAndServeTLS so -// dead TCP connections (e.g. closing laptop mid-download) eventually -// go away. -// -// direct lift from net/http/server.go -type TCPKeepAliveListener struct { - *net.TCPListener -} - -func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { - tc, err := ln.AcceptTCP() - if err != nil { - return - } - tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(3 * time.Minute) - return tc, nil -} From a592baf3a7f0a8f905ea0bf8387ccdba73950489 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 7 Sep 2014 20:39:57 -0700 Subject: [PATCH 07/26] Add method for explicit listener --- server.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server.go b/server.go index baa4fec..33db26d 100644 --- a/server.go +++ b/server.go @@ -73,6 +73,19 @@ func NewWithServer(s *http.Server) *GracefulServer { } } +func NewWithListener(s *http.Server, l net.Listener) *GracefulServer { + gracefulListener, ok := l.(*GracefulListener) + if !ok { + gracefulListener = NewListener(l) + } + return &GracefulServer{ + Server: s, + shutdown: make(chan struct{}), + wg: new(sync.WaitGroup), + listener: gracefulListener, + } +} + // A GracefulServer maintains a WaitGroup that counts how many in-flight // requests the server is handling. When it receives a shutdown signal, // it stops accepting new requests but does not actually shut down until From bba629308165135ae7c3cffa76c85b97cc422262 Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sat, 20 Sep 2014 19:11:15 -0700 Subject: [PATCH 08/26] Add advanced state transitions handler --- server.go | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/server.go b/server.go index 33db26d..4072b21 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,16 @@ type waitgroup interface { Wait() } +// StateHandler can be called by the server if the state of the connection changes. +// Notice that it passed previous state and the new state as parameters. +type StateHandler func(net.Conn, http.ConnState, http.ConnState) + +type Options struct { + Server *http.Server + StateHandler StateHandler + Listener net.Listener +} + // NewServer creates a new GracefulServer. The server will begin shutting down when // a value is passed to the Shutdown channel. func NewServer() *GracefulServer { @@ -73,16 +83,24 @@ func NewWithServer(s *http.Server) *GracefulServer { } } -func NewWithListener(s *http.Server, l net.Listener) *GracefulServer { - gracefulListener, ok := l.(*GracefulListener) - if !ok { - gracefulListener = NewListener(l) +func NewWithOptions(o Options) *GracefulServer { + // Set up listener + var listener *GracefulListener + if o.Listener != nil { + g, ok := o.Listener.(*GracefulListener) + if !ok { + listener = NewListener(o.Listener) + } else { + listener = g + } } + return &GracefulServer{ - Server: s, - shutdown: make(chan struct{}), - wg: new(sync.WaitGroup), - listener: gracefulListener, + listener: listener, + Server: o.Server, + stateHandler: o.StateHandler, + shutdown: make(chan struct{}), + wg: new(sync.WaitGroup), } } @@ -103,6 +121,8 @@ type GracefulServer struct { // used by test code up chan net.Listener + + stateHandler StateHandler } // Close stops the server from accepting new requets and beings shutting down. @@ -234,6 +254,9 @@ func (s *GracefulServer) Serve(listener net.Listener) error { s.FinishRoutine() } } + if s.stateHandler != nil { + s.stateHandler(conn, gconn.lastHTTPState, newState) + } gconn.lastHTTPState = newState if orgConnState != nil { orgConnState(conn, newState) From 69a4adcf8fc04e8aef0ed5f9e43982739220ed8f Mon Sep 17 00:00:00 2001 From: Sasha Klizhentas Date: Sun, 21 Sep 2014 16:34:22 -0700 Subject: [PATCH 09/26] Add GetFile method --- server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server.go b/server.go index 4072b21..996544e 100644 --- a/server.go +++ b/server.go @@ -45,6 +45,7 @@ import ( "crypto/tls" "net" "net/http" + "os" "sync" "sync/atomic" ) @@ -185,6 +186,10 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { return s.Serve(s.listener) } +func (gs *GracefulServer) GetFile() (*os.File, error) { + return gs.listener.GetFile() +} + func (gs *GracefulServer) HijackListener(s *http.Server, config *tls.Config) (*GracefulServer, error) { listener, err := gs.listener.Clone() if err != nil { From 38e66a3e3e920713163484c7ab57ffc39c35e257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benedikt=20B=C3=B6hm?= Date: Wed, 3 Dec 2014 15:29:55 +0100 Subject: [PATCH 10/26] expose waitgroup.Wait so shutdown handlers can wait on the server --- server.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 996544e..eb9297d 100644 --- a/server.go +++ b/server.go @@ -277,12 +277,11 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // This block is reached when the server has received a shut down command. if err == nil { - s.wg.Wait() return nil } else if _, ok := err.(listenerAlreadyClosed); ok { - s.wg.Wait() return nil } + return err } @@ -298,6 +297,10 @@ func (s *GracefulServer) FinishRoutine() { s.wg.Done() } +func (s *GracefulServer) Wait() { + s.wg.Wait() +} + var ( servers []*GracefulServer m sync.Mutex From 4c2f6a9b6e3a84f1aa9d8b418ccbb9396116fdab Mon Sep 17 00:00:00 2001 From: Alexander Klizhentas Date: Wed, 7 Jan 2015 13:59:40 -0800 Subject: [PATCH 11/26] Revert "expose waitgroup.Wait so shutdown handlers can wait on the server" --- server.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server.go b/server.go index eb9297d..996544e 100644 --- a/server.go +++ b/server.go @@ -277,11 +277,12 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // This block is reached when the server has received a shut down command. if err == nil { + s.wg.Wait() return nil } else if _, ok := err.(listenerAlreadyClosed); ok { + s.wg.Wait() return nil } - return err } @@ -297,10 +298,6 @@ func (s *GracefulServer) FinishRoutine() { s.wg.Done() } -func (s *GracefulServer) Wait() { - s.wg.Wait() -} - var ( servers []*GracefulServer m sync.Mutex From 1530288ffcbd8240284210454b9c6bc2aa230bf8 Mon Sep 17 00:00:00 2001 From: Alexander Klizhentas Date: Wed, 7 Jan 2015 15:14:21 -0800 Subject: [PATCH 12/26] Fix missing TLS state --- listener.go | 19 ++++++++++++++++--- server.go | 4 +++- server_test.go | 10 ++++++++-- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/listener.go b/listener.go index e729fd5..88f888d 100644 --- a/listener.go +++ b/listener.go @@ -30,6 +30,15 @@ type gracefulConn struct { lastHTTPState http.ConnState } +type gracefulAddr struct { + net.Addr + gconn *gracefulConn +} + +func (g *gracefulConn) LocalAddr() net.Addr { + return &gracefulAddr{g.Conn.LocalAddr(), g} +} + // A GracefulListener differs from a standard net.Listener in one way: if // Accept() is called after it is gracefully closed, it returns a // listenerAlreadyClosed error. The GracefulServer will ignore this error. @@ -59,8 +68,12 @@ func (l *GracefulListener) Accept() (net.Conn, error) { return nil, err } - gconn := &gracefulConn{conn, 0} - return gconn, nil + // don't wrap connection in case if it's tls because we won't break + // http server internal logic that relies on the type + if _, ok := conn.(*tls.Conn); ok { + return conn, nil + } + return &gracefulConn{conn, 0}, nil } // Close tells the wrapped listener to stop listening. It is idempotent. @@ -113,7 +126,7 @@ func (l *TLSListener) Accept() (c net.Conn, err error) { if err != nil { return } - c = tls.Server(c, l.config) + c = tls.Server(&gracefulConn{c, 0}, l.config) return } diff --git a/server.go b/server.go index 996544e..673e052 100644 --- a/server.go +++ b/server.go @@ -230,7 +230,9 @@ func (s *GracefulServer) Serve(listener net.Listener) error { orgConnState := s.Server.ConnState s.ConnState = func(conn net.Conn, newState http.ConnState) { - gconn := conn.(*gracefulConn) + // Ugly hack, but it works. We pass the information about the underlying state via the only available interface in net.Conn + // we do this not to override the tls.Conn, as the internal logic of http.Server depends on the type assertion (unfortunately) + gconn := conn.LocalAddr().(*gracefulAddr).gconn switch newState { case http.StateNew: // new_conn -> StateNew diff --git a/server_test.go b/server_test.go index 0ee9fee..808ccc1 100644 --- a/server_test.go +++ b/server_test.go @@ -124,7 +124,8 @@ func startGenericServer(t *testing.T, server *GracefulServer, statechanged chan // Wrap the ConnState handler with something that will notify // the statechanged channel when a state change happens server.ConnState = func(conn net.Conn, newState http.ConnState) { - s := conn.(*gracefulConn).lastHTTPState + gconn := conn.LocalAddr().(*gracefulAddr).gconn + s := gconn.lastHTTPState statechanged <- s } } @@ -246,7 +247,7 @@ func TestStateTransitions(t *testing.T) { server.wg = wg startServer(t, server, nil) - conn := &gracefulConn{nil, 0} + conn := &gracefulConn{&fakeConn{}, 0} for _, newState := range test.states { server.ConnState(conn, newState) } @@ -263,6 +264,11 @@ func TestStateTransitions(t *testing.T) { type fakeConn struct { net.Conn closeCalled bool + localAddr net.Addr +} + +func (f *fakeConn) LocalAddr() net.Addr { + return &net.IPAddr{} } func (c *fakeConn) Close() error { From 97d9cc9efd89bbbdab17f92386b65bfb377e10b3 Mon Sep 17 00:00:00 2001 From: Alexander Klizhentas Date: Thu, 8 Jan 2015 10:06:05 -0800 Subject: [PATCH 13/26] Code review comments --- listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/listener.go b/listener.go index 88f888d..0d4d57e 100644 --- a/listener.go +++ b/listener.go @@ -68,7 +68,7 @@ func (l *GracefulListener) Accept() (net.Conn, error) { return nil, err } - // don't wrap connection in case if it's tls because we won't break + // don't wrap connection if it's tls so we won't break // http server internal logic that relies on the type if _, ok := conn.(*tls.Conn); ok { return conn, nil From b254a48582ccbdcfedc2729d91f78e14ee2a9c74 Mon Sep 17 00:00:00 2001 From: Rick Date: Sun, 8 Feb 2015 22:29:38 +0000 Subject: [PATCH 14/26] Added support for net/http/fcgi listeners. --- README.md | 6 +++++- server.go | 41 ++++++++++++++++++++++++++++++++++++++--- server_test.go | 26 +++++++++++++++++++++----- 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 78e0fc0..cba1657 100644 --- a/README.md +++ b/README.md @@ -23,10 +23,14 @@ Manners ensures that all requests are served by incrementing a WaitGroup when a If your request handler spawns Goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. +### FCGI + +Manners supports three protocols: HTTP, HTTPS and FCGI. FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. + ### Compatability Manners 0.3.0 and above uses standard library functionality introduced in Go 1.3. ### Installation -`go get github.com/braintree/manners` +`go get github.com/rickb777/manners` diff --git a/server.go b/server.go index 673e052..40849f6 100644 --- a/server.go +++ b/server.go @@ -45,9 +45,11 @@ import ( "crypto/tls" "net" "net/http" + "net/http/fcgi" "os" "sync" "sync/atomic" + "strings" ) // interface describing a waitgroup, so unit @@ -131,10 +133,23 @@ func (s *GracefulServer) Close() { close(s.shutdown) } -// ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe. +func isUnixNetwork(addr string) bool { + return strings.HasPrefix(addr, "/") +} + +func chooseNetwork(addr string) string { + if isUnixNetwork(addr) { + return "unix" + } + return "tcp" +} + +// ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe, and +// also supports the FastCGI equivalent. func (s *GracefulServer) ListenAndServe() error { if s.listener == nil { - oldListener, err := net.Listen("tcp", s.Addr) + netwk := chooseNetwork(s.Addr) + oldListener, err := net.Listen(netwk, s.Addr) if err != nil { return err } @@ -175,6 +190,7 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { } if s.listener == nil { + // only "tcp" is supported with TLS ln, err := net.Listen("tcp", addr) if err != nil { return err @@ -275,7 +291,14 @@ func (s *GracefulServer) Serve(listener net.Listener) error { // notify test that server is up; wait for signal to continue s.up <- listener } - err := s.Server.Serve(listener) + + var err error + if isUnixNetwork(s.Server.Addr) { + os.Chmod(s.Server.Addr, os.ModePerm) + err = fcgi.Serve(listener, s.Server.Handler) + } else { + err = s.Server.Serve(listener) + } // This block is reached when the server has received a shut down command. if err == nil { @@ -306,6 +329,7 @@ var ( ) // ListenAndServe provides a graceful version of function provided by the net/http package. +// This supports HTTP but not HTTPS or FCGI. func ListenAndServe(addr string, handler http.Handler) error { server := NewWithServer(&http.Server{Addr: addr, Handler: handler}) m.Lock() @@ -314,7 +338,18 @@ func ListenAndServe(addr string, handler http.Handler) error { return server.ListenAndServe() } +// ListenAndServe provides a graceful version of function provided by the net/http/fcgi package. +// This supports FCGI but not HTTP/HTTPS. +func ListenAndServeFCGI(addr string, handler http.Handler) error { + server := NewWithServer(&http.Server{Addr: addr, Handler: handler}) + m.Lock() + servers = append(servers, server) + m.Unlock() + return server.ListenAndServe() +} + // ListenAndServeTLS provides a graceful version of function provided by the net/http package. +// This supports HTTPS but not HTTP or FCGI. func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error { server := NewWithServer(&http.Server{Addr: addr, Handler: handler}) m.Lock() diff --git a/server_test.go b/server_test.go index 808ccc1..7b99951 100644 --- a/server_test.go +++ b/server_test.go @@ -279,13 +279,19 @@ func (c *fakeConn) Close() error { type fakeListener struct { acceptRelease chan bool closeCalled chan bool + unix bool } -func newFakeListener() *fakeListener { return &fakeListener{make(chan bool, 1), make(chan bool, 1)} } +func newFakeListener(unix bool) *fakeListener { return &fakeListener{make(chan bool, 1), make(chan bool, 1), unix} } func (l *fakeListener) Addr() net.Addr { - addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8080") - return addr + if l.unix { + addr, _ := net.ResolveUnixAddr("unix", "/tmp/manners-test") + return addr + } else { + addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8080") + return addr + } } func (l *fakeListener) Close() error { @@ -301,10 +307,20 @@ func (l *fakeListener) Accept() (net.Conn, error) { // Test that a connection is closed upon reaching an idle state iff the server is shutting down. func TestCloseOnIdle(t *testing.T) { + testClose(t, false) +} + +// Test that a connection is closed upon reaching an idle state iff the server is shutting down. +func TestCloseOnIdleForUnixSocket(t *testing.T) { + testClose(t, true) +} + +// Test that a connection is closed upon reaching an idle state iff the server is shutting down. +func testClose(t *testing.T, unix bool) { server := NewServer() wg := newTestWg() server.wg = wg - fl := newFakeListener() + fl := newFakeListener(unix) runner := func() error { return server.Serve(fl) } @@ -560,7 +576,7 @@ func TestGlobalShutdown(t *testing.T) { }() go func() { - l := newFakeListener() + l := newFakeListener(false) serveerr <- Serve(l, nullHandler) }() From 77b2c7343564cea35ba42bf2a79af651c1683b22 Mon Sep 17 00:00:00 2001 From: Rick Date: Sun, 8 Feb 2015 23:05:25 +0000 Subject: [PATCH 15/26] New CloseOnInterrupt function provides an easy API for registering OS shutdown. --- server.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/server.go b/server.go index 40849f6..a6c3d30 100644 --- a/server.go +++ b/server.go @@ -28,14 +28,7 @@ or for a customized server: The server will shutdown cleanly when the Close() method is called: - go func() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, os.Interrupt, os.Kill) - <-sigchan - log.Info("Shutting down...") - manners.Close() - }() - + manners.CloseOnInterrupt() http.Handle("/hello", myHandler) log.Fatal(manners.ListenAndServe(":8080", nil)) */ @@ -50,6 +43,8 @@ import ( "sync" "sync/atomic" "strings" + "os/signal" + "syscall" ) // interface describing a waitgroup, so unit @@ -376,3 +371,21 @@ func Close() { servers = nil m.Unlock() } + +// CloseOnInterrupt creates a go-routine that will call the Close() function when certain OS +// signals are received. If no signals are specified, +// the following are used: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP, SIGUSR1. +// This function must be called before ListenAndServe. +func CloseOnInterrupt(signals ...os.Signal) { + go func() { + sigchan := make(chan os.Signal, 1) + if len(signals) > 0 { + signal.Notify(sigchan, signals) + } else { + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, + syscall.SIGQUIT, syscall.SIGHUP, syscall.SIGUSR1) + } + <-sigchan + Close() + }() +} From 15039a467ff0ae40a5611a04088b611650b119ab Mon Sep 17 00:00:00 2001 From: Rick Date: Sun, 8 Feb 2015 23:10:38 +0000 Subject: [PATCH 16/26] Correction. --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index a6c3d30..2a7dd31 100644 --- a/server.go +++ b/server.go @@ -380,7 +380,7 @@ func CloseOnInterrupt(signals ...os.Signal) { go func() { sigchan := make(chan os.Signal, 1) if len(signals) > 0 { - signal.Notify(sigchan, signals) + signal.Notify(sigchan, signals...) } else { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGHUP, syscall.SIGUSR1) From b61da14af4ffb2ed234222fe3013a7a27ec96cd6 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 9 Feb 2015 08:54:55 +0000 Subject: [PATCH 17/26] Added more description of FCGI usage. --- README.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cba1657..3bb757b 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,18 @@ manners.Close() Manners ensures that all requests are served by incrementing a WaitGroup when a request comes in and decrementing it when the request finishes. -If your request handler spawns Goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. +If your request handler spawns other Goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. ### FCGI -Manners supports three protocols: HTTP, HTTPS and FCGI. FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. +Manners supports three protocols: HTTP, HTTPS and FCGI. In manners, FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. To use FCGI, the port string must specify the Unix socket and start with a slash, e.g. + +```go +func main() { + handler := MyHTTPHandler() + manners.ListenAndServe("/var/run/goserver", handler) +} +``` ### Compatability From ec92c6e05e65002fd3084a5361394b9593f32ac6 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 9 Feb 2015 09:23:25 +0000 Subject: [PATCH 18/26] Improved the readme. --- README.md | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 3bb757b..f3a9d36 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ A *polite* webserver for Go. -Manners allows you to shut your Go webserver down gracefully, without dropping any requests. It can act as a drop-in replacement for the standard library's http.ListenAndServe function: +Manners allows you to shut your Go webserver down gracefully, without dropping any requests. It can act as a drop-in replacement for the standard library's +[http.ListenAndServe](http://golang.org/pkg/net/http/#ListenAndServe) function: ```go func main() { @@ -17,15 +18,39 @@ Then, when you want to shut the server down: manners.Close() ``` -(Note that this does not block until all the requests are finished. Rather, the call to manners.ListenAndServe will stop blocking when all the requests are finished.) +(Note that this does not block until all the requests are finished. Rather, the call to `manners.ListenAndServe` will stop blocking when all the requests are finished.) + +### Other goroutines Manners ensures that all requests are served by incrementing a WaitGroup when a request comes in and decrementing it when the request finishes. -If your request handler spawns other Goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. +If your request handler spawns other goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. If you don't do this, your other goroutines may be terminated abruptly when the server exits. + +### Handling signals + +It's good to close down the server cleanly when OS signals are received. This is easy: just add + +```go +manners.CloseOnInterrupt() +``` +before the `ListenAndServe` call. This kicks off a separate goroutine to wait for an OS signal, upon which it simply calls `manners.Close()` for you. + +### HTTP, HTTPS and FCGI -### FCGI +Manners supports three protocols: HTTP, HTTPS and FCGI. HTTP is illustrated above. +For HTTPS, Manners can likewise act as a drop-in replacement for the standard library's +[http.ListenAndServeTLS](http://golang.org/pkg/net/http/#ListenAndServeTLS) function: -Manners supports three protocols: HTTP, HTTPS and FCGI. In manners, FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. To use FCGI, the port string must specify the Unix socket and start with a slash, e.g. +```go +func main() { + handler := MyHTTPHandler() + certFile := MyCertificate() + keyFile := MyKeyFile() + manners.ListenAndServeTLS(":https", certFile, keyFile, handler) +} +``` + +In Manners, FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. ```go func main() { @@ -34,6 +59,10 @@ func main() { } ``` +To use FCGI, the port string must specify the Unix socket and start with a slash, as in the example above. In this case, Manners will use [fcgi.Serve](http://golang.org/pkg/net/http/fcgi/#Serve). + +In each of the protocols, Manners drains down the connections cleanly when `manners.Close()` is called. + ### Compatability Manners 0.3.0 and above uses standard library functionality introduced in Go 1.3. From 7bb2fc694516bdd104f386a569c444cc56f6c502 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 9 Feb 2015 10:43:08 +0000 Subject: [PATCH 19/26] Improved the Unix/Fcgi behaviour; added logging on startup & shutdown --- README.md | 2 +- server.go | 65 +++++++++++++++++++++++++++++++++++--------------- server_test.go | 3 +++ 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index f3a9d36..25feebd 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ func main() { } ``` -To use FCGI, the port string must specify the Unix socket and start with a slash, as in the example above. In this case, Manners will use [fcgi.Serve](http://golang.org/pkg/net/http/fcgi/#Serve). +To use FCGI, the port string must specify the Unix socket and start with a slash or dot, as in the example above. In this case, Manners will use [fcgi.Serve](http://golang.org/pkg/net/http/fcgi/#Serve). In each of the protocols, Manners drains down the connections cleanly when `manners.Close()` is called. diff --git a/server.go b/server.go index 2a7dd31..637419b 100644 --- a/server.go +++ b/server.go @@ -36,6 +36,9 @@ package manners import ( "crypto/tls" + "fmt" + "io/ioutil" + "log" "net" "net/http" "net/http/fcgi" @@ -47,6 +50,17 @@ import ( "syscall" ) +var logger = log.New(ioutil.Discard, "", 0) + +// SetLogger changes the logger used for the startup and shutdown messages +// generated by Manners. By default, no log messages are emitted. +// To make Manners logging behave the same as per the standard +// log package, i.e. to stderr, use +// `SetLogger(log.New(os.Stderr, "", log.LstdFlags))` +func SetLogger(l *log.Logger) { + logger = l +} + // interface describing a waitgroup, so unit // tests can mock out an instrumentable version type waitgroup interface { @@ -123,28 +137,49 @@ type GracefulServer struct { stateHandler StateHandler } -// Close stops the server from accepting new requets and beings shutting down. +// Close stops the server from accepting new requests and begins shutting down. func (s *GracefulServer) Close() { + logger.Printf("Shutting down server on %s\n", s.Server.Addr) close(s.shutdown) } func isUnixNetwork(addr string) bool { - return strings.HasPrefix(addr, "/") + return strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, ".") } -func chooseNetwork(addr string) string { - if isUnixNetwork(addr) { - return "unix" +func listenToUnix(bind string) (listener net.Listener, err error) { + _, err = os.Stat(bind) + if err == nil { + // socket exists and is "already in use"; + // presume this is from earlier run and therefore delete it + err = os.Remove(bind) + if err != nil { + return + } + } else if !os.IsNotExist(err) { + return + } + listener, err = net.Listen("unix", bind) + return +} + +func listen(bind string) (listener net.Listener, err error) { + if isUnixNetwork(bind) { + logger.Printf("Listening on unix socket %s\n", bind) + return listenToUnix(bind) + } else if strings.Contains(bind, ":") { + logger.Printf("Listening on tcp socket %s\n", bind) + return net.Listen("tcp", bind) + } else { + return nil, fmt.Errorf("error while parsing bind arg %v", bind) } - return "tcp" } // ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe, and // also supports the FastCGI equivalent. func (s *GracefulServer) ListenAndServe() error { if s.listener == nil { - netwk := chooseNetwork(s.Addr) - oldListener, err := net.Listen(netwk, s.Addr) + oldListener, err := listen(s.Addr) if err != nil { return err } @@ -324,7 +359,9 @@ var ( ) // ListenAndServe provides a graceful version of function provided by the net/http package. -// This supports HTTP but not HTTPS or FCGI. +// This supports HTTP and FCGI but not HTTPS. For HTTP, the `addr` will contain a colon, +// e.g. ":8001". To use FCGI, a Unix socket name must be supplied for `addr` which +// must begin with '/' or '.'. func ListenAndServe(addr string, handler http.Handler) error { server := NewWithServer(&http.Server{Addr: addr, Handler: handler}) m.Lock() @@ -333,16 +370,6 @@ func ListenAndServe(addr string, handler http.Handler) error { return server.ListenAndServe() } -// ListenAndServe provides a graceful version of function provided by the net/http/fcgi package. -// This supports FCGI but not HTTP/HTTPS. -func ListenAndServeFCGI(addr string, handler http.Handler) error { - server := NewWithServer(&http.Server{Addr: addr, Handler: handler}) - m.Lock() - servers = append(servers, server) - m.Unlock() - return server.ListenAndServe() -} - // ListenAndServeTLS provides a graceful version of function provided by the net/http package. // This supports HTTPS but not HTTP or FCGI. func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error { diff --git a/server_test.go b/server_test.go index 7b99951..77f71c5 100644 --- a/server_test.go +++ b/server_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/ioutil" + "log" "net" "net/http" "os" @@ -118,6 +119,8 @@ func newClient(addr net.Addr, tls bool) *client { var nullHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) func startGenericServer(t *testing.T, server *GracefulServer, statechanged chan http.ConnState, runner func() error) (l net.Listener, errc chan error) { + SetLogger(log.New(ioutil.Discard, "", 0)) + // SetLogger(log.New(os.Stderr, "", log.LstdFlags)) server.Addr = "localhost:0" server.Handler = nullHandler if statechanged != nil { From bd32029b2e3a97d636e4e9a5fb3dd6e80d170ec4 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 9 Feb 2015 11:11:40 +0000 Subject: [PATCH 20/26] Removed erroneous type assertion. --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 637419b..e12a247 100644 --- a/server.go +++ b/server.go @@ -183,7 +183,7 @@ func (s *GracefulServer) ListenAndServe() error { if err != nil { return err } - s.listener = NewListener(oldListener.(*net.TCPListener)) + s.listener = NewListener(oldListener) } return s.Serve(s.listener) } From 5600a06c88a1f6064df5652ba8b1c2c6c3776af3 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 9 Feb 2015 11:12:57 +0000 Subject: [PATCH 21/26] doc comments --- server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index e12a247..b975917 100644 --- a/server.go +++ b/server.go @@ -56,7 +56,8 @@ var logger = log.New(ioutil.Discard, "", 0) // generated by Manners. By default, no log messages are emitted. // To make Manners logging behave the same as per the standard // log package, i.e. to stderr, use -// `SetLogger(log.New(os.Stderr, "", log.LstdFlags))` +// `SetLogger(log.New(os.Stderr, "", log.LstdFlags))`. Or use +// any other logger of your own. func SetLogger(l *log.Logger) { logger = l } From f1eb01c2864f3d1f0abfd7c0457b9dbb131f8ce2 Mon Sep 17 00:00:00 2001 From: Rick Date: Thu, 11 Jun 2015 22:00:44 +0100 Subject: [PATCH 22/26] Manually merged forward all the FCGI code, along with the pluggable logger and the nice API for signal handling. --- README.md | 28 +++++++++++++++++++ logger.go | 18 ++++++++++++ server.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 118 insertions(+), 12 deletions(-) create mode 100644 logger.go diff --git a/README.md b/README.md index 78e0fc0..1cc65dc 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,34 @@ Manners ensures that all requests are served by incrementing a WaitGroup when a If your request handler spawns Goroutines that are not guaranteed to finish with the request, you can ensure they are also completed with the `StartRoutine` and `FinishRoutine` functions on the server. +### HTTP, HTTPS and FCGI + +Manners supports three protocols: HTTP, HTTPS and FCGI. HTTP is illustrated above. +For HTTPS, Manners can likewise act as a drop-in replacement for the standard library's +[http.ListenAndServeTLS](http://golang.org/pkg/net/http/#ListenAndServeTLS) function: + +```go +func main() { + handler := MyHTTPHandler() + certFile := MyCertificate() + keyFile := MyKeyFile() + manners.ListenAndServeTLS(":https", certFile, keyFile, handler) +} +``` + +In Manners, FCGI only operates via local a Unix socket connected to a co-hosted proxy, such as Apache or Nginx. + +```go +func main() { + handler := MyHTTPHandler() + manners.ListenAndServe("/var/run/goserver.sock", handler) +} +``` + +To use FCGI, the port string must specify the Unix socket and start with a slash or dot, as in the example above. In this case, Manners will use [fcgi.Serve](http://golang.org/pkg/net/http/fcgi/#Serve). + +In each of the protocols, Manners drains down the connections cleanly when `manners.Close()` is called. + ### Compatability Manners 0.3.0 and above uses standard library functionality introduced in Go 1.3. diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..59d0c9a --- /dev/null +++ b/logger.go @@ -0,0 +1,18 @@ +package manners + +import ( + "log" + "io/ioutil" +) + +var logger = log.New(ioutil.Discard, "", 0) + +// SetLogger changes the logger used for the startup and shutdown messages +// generated by Manners. By default, no log messages are emitted. +// To make Manners logging behave the same as per the standard +// log package, i.e. to stderr, use +// `SetLogger(log.New(os.Stderr, "", log.LstdFlags))` +func SetLogger(l *log.Logger) { + logger = l +} + diff --git a/server.go b/server.go index 9dd25f5..57ca8f5 100644 --- a/server.go +++ b/server.go @@ -27,14 +27,7 @@ or for a customized server: The server will shut down cleanly when the Close() method is called: - go func() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, os.Interrupt, os.Kill) - <-sigchan - log.Info("Shutting down...") - manners.Close() - }() - + manners.CloseOnInterrupt() http.Handle("/hello", myHandler) log.Fatal(manners.ListenAndServe(":8080", nil)) */ @@ -46,6 +39,12 @@ import ( "net/http" "sync" "sync/atomic" + "strings" + "os" + "fmt" + "net/http/fcgi" + "os/signal" + "syscall" ) // A GracefulServer maintains a WaitGroup that counts how many in-flight @@ -83,16 +82,52 @@ func NewWithServer(s *http.Server) *GracefulServer { // Close stops the server from accepting new requets and begins shutting down. // It returns true if it's the first time Close is called. func (s *GracefulServer) Close() bool { + logger.Printf("Shutting down server on %s\n", s.Server.Addr) return <-s.shutdown } -// ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe. +func isUnixNetwork(addr string) bool { + return strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, ".") +} + +func listenToUnix(bind string) (listener net.Listener, err error) { + _, err = os.Stat(bind) + if err == nil { + // socket exists and is "already in use"; + // presume this is from earlier run and therefore delete it + err = os.Remove(bind) + if err != nil { + return + } + } else if !os.IsNotExist(err) { + return + } + listener, err = net.Listen("unix", bind) + return +} + +func listen(bind string) (listener net.Listener, err error) { + if isUnixNetwork(bind) { + logger.Printf("Listening on unix socket %s\n", bind) + return listenToUnix(bind) + } else if strings.Contains(bind, ":") { + logger.Printf("Listening on tcp socket %s\n", bind) + return net.Listen("tcp", bind) + } else { + return nil, fmt.Errorf("error while parsing bind arg %v", bind) + } +} + +// ListenAndServe provides a graceful equivalent of net/http.Server.ListenAndServe. +// This supports HTTP and FCGI but not HTTPS. For HTTP, the `addr` will contain a colon, +// e.g. ":8001". To use FCGI, a Unix socket name must be supplied for `addr` which +// must begin with '/' or '.'. func (s *GracefulServer) ListenAndServe() error { addr := s.Addr if addr == "" { addr = ":http" } - listener, err := net.Listen("tcp", addr) + listener, err := listen(addr) if err != nil { return err } @@ -100,7 +135,8 @@ func (s *GracefulServer) ListenAndServe() error { return s.Serve(listener) } -// ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. +// ListenAndServeTLS provides a graceful equivalent of net/http.Server.ListenAndServeTLS. +// This supports HTTPS only (not HTTP or FCGI). func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { // direct lift from net/http/server.go addr := s.Addr @@ -202,7 +238,13 @@ func (s *GracefulServer) Serve(listener net.Listener) error { s.up <- listener } - err := s.Server.Serve(listener) + var err error + if isUnixNetwork(s.Server.Addr) { + os.Chmod(s.Server.Addr, os.ModePerm) + err = fcgi.Serve(listener, s.Server.Handler) + } else { + err = s.Server.Serve(listener) + } // This block is reached when the server has received a shut down command // or a real error happened. @@ -226,3 +268,21 @@ func (s *GracefulServer) StartRoutine() { func (s *GracefulServer) FinishRoutine() { s.wg.Done() } + +// CloseOnInterrupt creates a go-routine that will call the Close() function when certain OS +// signals are received. If no signals are specified, +// the following are used: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP, SIGUSR1. +// This function must be called before ListenAndServe. +func CloseOnInterrupt(signals ...os.Signal) { + go func() { + sigchan := make(chan os.Signal, 1) + if len(signals) > 0 { + signal.Notify(sigchan, signals...) + } else { + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, + syscall.SIGQUIT, syscall.SIGHUP, syscall.SIGUSR1) + } + <-sigchan + Close() + }() +} From cdce84f3418a94dab5fa0878a2bcccfb19fdac5a Mon Sep 17 00:00:00 2001 From: Rick Date: Fri, 12 Jun 2015 00:07:41 +0100 Subject: [PATCH 23/26] Refinement and bug-fixing on interrupt handling code. Extra checks have been added to prevent obviously-meaningless usage. --- README.md | 9 +++++++++ server.go | 35 +++++++++++++++++++++++++++++------ static.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 86 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 1cc65dc..741d7bb 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,15 @@ To use FCGI, the port string must specify the Unix socket and start with a slash In each of the protocols, Manners drains down the connections cleanly when `manners.Close()` is called. +### Handling signals + +It's good to close down the server cleanly when OS signals are received. This is easy: just add + +```go +manners.CloseOnInterrupt() +``` +before the `ListenAndServe` call. This kicks off a separate goroutine to wait for an OS signal, upon which it simply calls `manners.Close()` for you. Optionally, you can pass in a list of the particular signals you care about and you can find out which signal was received, if any, afterwards. + ### Compatability Manners 0.3.0 and above uses standard library functionality introduced in Go 1.3. diff --git a/server.go b/server.go index 57ca8f5..3b16f3b 100644 --- a/server.go +++ b/server.go @@ -66,6 +66,14 @@ type GracefulServer struct { lastConnState map[net.Conn]http.ConnState up chan net.Listener // Only used by test code. + + signal os.Signal +} + +// NewServer creates a new server that will shut down gracefully. +// Call Close() to stop the server. +func NewServer(addr string, handler http.Handler) *GracefulServer { + return NewWithServer(&http.Server{Addr: addr, Handler: handler}) } // NewWithServer wraps an existing http.Server object and returns a @@ -272,9 +280,12 @@ func (s *GracefulServer) FinishRoutine() { // CloseOnInterrupt creates a go-routine that will call the Close() function when certain OS // signals are received. If no signals are specified, // the following are used: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP, SIGUSR1. -// This function must be called before ListenAndServe. -func CloseOnInterrupt(signals ...os.Signal) { - go func() { +// This function must be called before ListenAndServe, ListenAndServeTLS, or Serve. +func (s *GracefulServer) CloseOnInterrupt(signals ...os.Signal) *GracefulServer { + if s == nil { + panic("Program error: the server must exist before this method is called.") + } + go func(rx *GracefulServer) { sigchan := make(chan os.Signal, 1) if len(signals) > 0 { signal.Notify(sigchan, signals...) @@ -282,7 +293,19 @@ func CloseOnInterrupt(signals ...os.Signal) { signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGQUIT, syscall.SIGHUP, syscall.SIGUSR1) } - <-sigchan - Close() - }() + rx.signal = <-sigchan + rx.Close() + }(s) + return s +} + +// SignalReceived gets the signal that caused the server to close, if any. If Close() was called +// some other way, this method will return nil. +// +// Note that, by convention, SIGUSR1 is often used to cause a server to close all its current +// connections cleanly, close its log files, and then restart. This facilitates log rotation. +// If you need this behaviour, you will need to provide a loop around both the CloseOnInterrupt and +// ListenAndServe calls. +func (s *GracefulServer) SignalReceived() os.Signal { + return s.signal } diff --git a/static.go b/static.go index 2a74b09..9b4f5f4 100644 --- a/static.go +++ b/static.go @@ -3,33 +3,77 @@ package manners import ( "net" "net/http" + "os") + +var ( + defaultServer *GracefulServer + defaultSignals []os.Signal + hasSignals = false ) -var defaultServer *GracefulServer +func preventReEntrance() { + if defaultServer != nil { + panic("Program error: the default server must be closed before re-use.") + } +} // ListenAndServe provides a graceful version of the function provided by the // net/http package. Call Close() to stop the server. func ListenAndServe(addr string, handler http.Handler) error { - defaultServer = NewWithServer(&http.Server{Addr: addr, Handler: handler}) + preventReEntrance() + defaultServer = NewServer(addr, handler) + if (hasSignals) { + defaultServer.CloseOnInterrupt(defaultSignals...) + } return defaultServer.ListenAndServe() } // ListenAndServeTLS provides a graceful version of the function provided by the // net/http package. Call Close() to stop the server. func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error { - defaultServer = NewWithServer(&http.Server{Addr: addr, Handler: handler}) + preventReEntrance() + defaultServer = NewServer(addr, handler) + if (hasSignals) { + defaultServer.CloseOnInterrupt(defaultSignals...) + } return defaultServer.ListenAndServeTLS(certFile, keyFile) } // Serve provides a graceful version of the function provided by the net/http // package. Call Close() to stop the server. func Serve(l net.Listener, handler http.Handler) error { + preventReEntrance() defaultServer = NewWithServer(&http.Server{Handler: handler}) + if (hasSignals) { + defaultServer.CloseOnInterrupt(defaultSignals...) + } return defaultServer.Serve(l) } // Shuts down the default server used by ListenAndServe, ListenAndServeTLS and // Serve. It returns true if it's the first time Close is called. func Close() bool { - return defaultServer.Close() + outcome := defaultServer.Close() + defaultServer = nil + return outcome +} + +// CloseOnInterrupt creates a go-routine that will call the Close() function when certain OS +// signals are received. If no signals are specified, +// the following are used: SIGINT, SIGTERM, SIGKILL, SIGQUIT, SIGHUP, SIGUSR1. +// This function must be called before ListenAndServe, ListenAndServeTLS, or Serve. +func CloseOnInterrupt(signals ...os.Signal) { + defaultSignals = signals + hasSignals = true +} + +// After a signal has cause the server to close, this method allows you to determine which +// signal had been received. If Close was called some other way, this method will return nil. +// +// Note that, by convention, SIGUSR1 is often used to cause a server to close all its current +// connections cleanly, close its log files, and then restart. This facilitates log rotation. +// If you need this behaviour, you will need to provide a loop around the CloseOnInterrupt and +// ListenAndServe calls. +func SignalReceived() os.Signal { + return defaultServer.SignalReceived() } From 27a08b6f639c4f35f30cf2b8b41dcd8337450681 Mon Sep 17 00:00:00 2001 From: Rick Date: Thu, 18 Aug 2016 16:23:59 +0100 Subject: [PATCH 24/26] Merged new FCGI and logging into the tip of mailgun/manners:master. --- Makefile | 3 - README.md | 4 + helpers_test.go | 29 ++-- listener.go | 190 ++++++++++++++++++++++++++ logger.go | 3 +- server.go | 261 ++++++++++++++++++++++++++---------- server_test.go | 268 +++++++++++++++++++++++++++++++------ static.go | 17 +-- test_helpers/conn.go | 7 +- test_helpers/listener.go | 12 +- test_helpers/wait_group.go | 10 +- transition_test.go | 6 +- 12 files changed, 652 insertions(+), 158 deletions(-) delete mode 100644 Makefile create mode 100644 listener.go diff --git a/Makefile b/Makefile deleted file mode 100644 index 10975c5..0000000 --- a/Makefile +++ /dev/null @@ -1,3 +0,0 @@ -cover: - go test -v . -coverprofile=/tmp/coverage.out - go tool cover -html=/tmp/coverage.out diff --git a/README.md b/README.md index 741d7bb..2a62d9f 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,10 @@ manners.CloseOnInterrupt() ``` before the `ListenAndServe` call. This kicks off a separate goroutine to wait for an OS signal, upon which it simply calls `manners.Close()` for you. Optionally, you can pass in a list of the particular signals you care about and you can find out which signal was received, if any, afterwards. +### Known Issues + +Manners does not correctly shut down long-lived keepalive connections when issued a shutdown command. Clients on an idle keepalive connection may see a connection reset error rather than a close. See https://github.com/braintree/manners/issues/13 for details. + ### Compatability Manners 0.3.0 and above uses standard library functionality introduced in Go 1.3. diff --git a/helpers_test.go b/helpers_test.go index 220c809..d176abe 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -9,21 +9,21 @@ import ( "testing" ) -func newServer() *GracefulServer { - return NewWithServer(new(http.Server)) -} - // a simple step-controllable http client type client struct { tls bool addr net.Addr connected chan error sendrequest chan bool - idle chan error - idlerelease chan bool + response chan *rawResponse closed chan bool } +type rawResponse struct { + body []string + err error +} + func (c *client) Run() { go func() { var err error @@ -39,19 +39,21 @@ func (c *client) Run() { for <-c.sendrequest { _, err = conn.Write([]byte("GET / HTTP/1.1\nHost: localhost:8000\n\n")) if err != nil { - c.idle <- err + c.response <- &rawResponse{err: err} } // Read response; no content scanner := bufio.NewScanner(conn) + var lines []string for scanner.Scan() { // our null handler doesn't send a body, so we know the request is // done when we reach the blank line after the headers - if scanner.Text() == "" { + line := scanner.Text() + if line == "" { break } + lines = append(lines, line) } - c.idle <- scanner.Err() - <-c.idlerelease + c.response <- &rawResponse{lines, scanner.Err()} } conn.Close() ioutil.ReadAll(conn) @@ -65,8 +67,7 @@ func newClient(addr net.Addr, tls bool) *client { tls: tls, connected: make(chan error), sendrequest: make(chan bool), - idle: make(chan error), - idlerelease: make(chan bool), + response: make(chan *rawResponse), closed: make(chan bool), } } @@ -81,7 +82,7 @@ func startGenericServer(t *testing.T, server *GracefulServer, statechanged chan // Wrap the ConnState handler with something that will notify // the statechanged channel when a state change happens server.ConnState = func(conn net.Conn, newState http.ConnState) { - statechanged <- newState + statechanged <- conn.LocalAddr().(*gracefulAddr).gconn.lastHTTPState } } @@ -105,7 +106,7 @@ func startGenericServer(t *testing.T, server *GracefulServer, statechanged chan } func startServer(t *testing.T, server *GracefulServer, statechanged chan http.ConnState) ( -l net.Listener, errc chan error) { + l net.Listener, errc chan error) { return startGenericServer(t, server, statechanged, server.ListenAndServe) } diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..ccb1c9f --- /dev/null +++ b/listener.go @@ -0,0 +1,190 @@ +package manners + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "sync" + "time" +) + +// NewListener wraps an existing listener for use with +// GracefulServer. +// +// Note that you generally don't need to use this directly as +// GracefulServer will automatically wrap any non-graceful listeners +// supplied to it. +func NewListener(l net.Listener) *GracefulListener { + return &GracefulListener{ + listener: l, + mutex: &sync.RWMutex{}, + open: true, + } +} + +// A gracefulCon wraps a normal net.Conn and tracks the last known http state. +type gracefulConn struct { + net.Conn + lastHTTPState http.ConnState + // protected tells whether the connection is going to defer server shutdown + // until the current HTTP request is completed. + protected bool +} + +type gracefulAddr struct { + net.Addr + gconn *gracefulConn +} + +func (g *gracefulConn) LocalAddr() net.Addr { + return &gracefulAddr{g.Conn.LocalAddr(), g} +} + +// retrieveGracefulConn retrieves a concrete gracefulConn instance from an +// interface value that can either refer to it directly or refer to a tls.Conn +// instance wrapping around a gracefulConn one. +func retrieveGracefulConn(conn net.Conn) *gracefulConn { + return conn.LocalAddr().(*gracefulAddr).gconn +} + +// A GracefulListener differs from a standard net.Listener in one way: if +// Accept() is called after it is gracefully closed, it returns a +// listenerAlreadyClosed error. The GracefulServer will ignore this error. +type GracefulListener struct { + listener net.Listener + open bool + mutex *sync.RWMutex +} + +func (l *GracefulListener) isClosed() bool { + l.mutex.RLock() + defer l.mutex.RUnlock() + return !l.open +} + +func (l *GracefulListener) Addr() net.Addr { + return l.listener.Addr() +} + +// Accept implements the Accept method in the Listener interface. +func (l *GracefulListener) Accept() (net.Conn, error) { + conn, err := l.listener.Accept() + if err != nil { + if l.isClosed() { + err = listenerAlreadyClosed{err} + } + return nil, err + } + + // don't wrap connection if it's tls so we won't break + // http server internal logic that relies on the type + if _, ok := conn.(*tls.Conn); ok { + return conn, nil + } + return &gracefulConn{Conn: conn}, nil +} + +// Close tells the wrapped listener to stop listening. It is idempotent. +func (l *GracefulListener) Close() error { + l.mutex.Lock() + defer l.mutex.Unlock() + if !l.open { + return nil + } + l.open = false + return l.listener.Close() +} + +func (l *GracefulListener) GetFile() (*os.File, error) { + return getListenerFile(l.listener) +} + +func (l *GracefulListener) Clone() (net.Listener, error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if !l.open { + return nil, fmt.Errorf("listener is already closed") + } + + file, err := l.GetFile() + if err != nil { + return nil, err + } + defer file.Close() + + fl, err := net.FileListener(file) + if nil != err { + return nil, err + } + return fl, nil +} + +// A listener implements a network listener (net.Listener) for TLS connections. +// direct lift from crypto/tls.go +type TLSListener struct { + net.Listener + config *tls.Config +} + +// Accept waits for and returns the next incoming TLS connection. +// The returned connection c is a *tls.Conn. +func (l *TLSListener) Accept() (c net.Conn, err error) { + c, err = l.Listener.Accept() + if err != nil { + return + } + c = tls.Server(&gracefulConn{Conn: c}, l.config) + return +} + +// NewListener creates a Listener which accepts connections from an inner +// Listener and wraps each connection with Server. +// The configuration config must be non-nil and must have +// at least one certificate. +func NewTLSListener(inner net.Listener, config *tls.Config) net.Listener { + l := new(TLSListener) + l.Listener = inner + l.config = config + return l +} + +type listenerAlreadyClosed struct { + error +} + +// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by ListenAndServe and ListenAndServeTLS so +// dead TCP connections (e.g. closing laptop mid-download) eventually +// go away. +// +// direct lift from net/http/server.go +type TCPKeepAliveListener struct { + *net.TCPListener +} + +func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil +} + +func getListenerFile(listener net.Listener) (*os.File, error) { + switch t := listener.(type) { + case *net.TCPListener: + return t.File() + case *net.UnixListener: + return t.File() + case TCPKeepAliveListener: + return t.TCPListener.File() + case *TLSListener: + return getListenerFile(t.Listener) + } + return nil, fmt.Errorf("Unsupported listener: %T", listener) +} diff --git a/logger.go b/logger.go index 59d0c9a..7de7041 100644 --- a/logger.go +++ b/logger.go @@ -1,8 +1,8 @@ package manners import ( - "log" "io/ioutil" + "log" ) var logger = log.New(ioutil.Discard, "", 0) @@ -15,4 +15,3 @@ var logger = log.New(ioutil.Discard, "", 0) func SetLogger(l *log.Logger) { logger = l } - diff --git a/server.go b/server.go index 3b16f3b..3a56f76 100644 --- a/server.go +++ b/server.go @@ -35,18 +35,28 @@ package manners import ( "crypto/tls" + "fmt" "net" "net/http" - "sync" - "sync/atomic" - "strings" - "os" - "fmt" "net/http/fcgi" + "os" "os/signal" + "strings" + "sync" + "sync/atomic" "syscall" ) +// StateHandler can be called by the server if the state of the connection changes. +// Notice that it passed previous state and the new state as parameters. +type StateHandler func(net.Conn, http.ConnState, http.ConnState) + +type Options struct { + Server *http.Server + StateHandler StateHandler + Listener net.Listener +} + // A GracefulServer maintains a WaitGroup that counts how many in-flight // requests the server is handling. When it receives a shutdown signal, // it stops accepting new requests but does not actually shut down until @@ -55,35 +65,56 @@ import ( // GracefulServer embeds the underlying net/http.Server making its non-override // methods and properties avaiable. // -// It must be initialized by calling NewWithServer. +// It must be initialized by calling NewServer or NewWithServer type GracefulServer struct { *http.Server - shutdown chan bool - wg waitGroup - - lcsmu sync.RWMutex - lastConnState map[net.Conn]http.ConnState + shutdown chan bool + shutdownFinished chan bool + wg waitGroup + listener *GracefulListener + stateHandler StateHandler up chan net.Listener // Only used by test code. - signal os.Signal + signal os.Signal } -// NewServer creates a new server that will shut down gracefully. -// Call Close() to stop the server. -func NewServer(addr string, handler http.Handler) *GracefulServer { - return NewWithServer(&http.Server{Addr: addr, Handler: handler}) +// NewServer creates a new GracefulServer. +func NewServer() *GracefulServer { + return NewWithServer(new(http.Server)) } // NewWithServer wraps an existing http.Server object and returns a // GracefulServer that supports all of the original Server operations. func NewWithServer(s *http.Server) *GracefulServer { return &GracefulServer{ - Server: s, - shutdown: make(chan bool), - wg: new(sync.WaitGroup), - lastConnState: make(map[net.Conn]http.ConnState), + Server: s, + shutdown: make(chan bool), + shutdownFinished: make(chan bool, 1), + wg: new(sync.WaitGroup), + } +} + +func NewWithOptions(o Options) *GracefulServer { + // Set up listener + var listener *GracefulListener + if o.Listener != nil { + g, ok := o.Listener.(*GracefulListener) + if !ok { + listener = NewListener(o.Listener) + } else { + listener = g + } + } + + return &GracefulServer{ + listener: listener, + Server: o.Server, + stateHandler: o.StateHandler, + shutdown: make(chan bool), + shutdownFinished: make(chan bool, 1), + wg: new(sync.WaitGroup), } } @@ -94,6 +125,15 @@ func (s *GracefulServer) Close() bool { return <-s.shutdown } +// BlockingClose is similar to Close, except that it blocks until the last +// connection has been closed. +func (s *GracefulServer) BlockingClose() bool { + logger.Printf("Shutting down server on %s (blocking)\n", s.Server.Addr) + result := s.Close() + <-s.shutdownFinished + return result +} + func isUnixNetwork(addr string) bool { return strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, ".") } @@ -126,25 +166,23 @@ func listen(bind string) (listener net.Listener, err error) { } } -// ListenAndServe provides a graceful equivalent of net/http.Server.ListenAndServe. -// This supports HTTP and FCGI but not HTTPS. For HTTP, the `addr` will contain a colon, -// e.g. ":8001". To use FCGI, a Unix socket name must be supplied for `addr` which -// must begin with '/' or '.'. +// ListenAndServe provides a graceful equivalent of net/http.Serve.ListenAndServe. func (s *GracefulServer) ListenAndServe() error { - addr := s.Addr - if addr == "" { - addr = ":http" - } - listener, err := listen(addr) - if err != nil { - return err + if s.listener == nil { + addr := s.Addr + if addr == "" { + addr = ":http" + } + oldListener, err := listen(addr) + if err != nil { + return err + } + s.listener = NewListener(oldListener) } - - return s.Serve(listener) + return s.Serve(s.listener) } -// ListenAndServeTLS provides a graceful equivalent of net/http.Server.ListenAndServeTLS. -// This supports HTTPS only (not HTTP or FCGI). +// ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { // direct lift from net/http/server.go addr := s.Addr @@ -166,74 +204,118 @@ func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { return err } - ln, err := net.Listen("tcp", addr) + return s.ListenAndServeTLSWithConfig(config) +} + +// ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. +func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { + addr := s.Addr + if addr == "" { + addr = ":https" + } + + if s.listener == nil { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + tlsListener := NewTLSListener(TCPKeepAliveListener{ln.(*net.TCPListener)}, config) + s.listener = NewListener(tlsListener) + } + return s.Serve(s.listener) +} + +func (gs *GracefulServer) GetFile() (*os.File, error) { + return gs.listener.GetFile() +} + +func (gs *GracefulServer) HijackListener(s *http.Server, config *tls.Config) (*GracefulServer, error) { + listener, err := gs.listener.Clone() if err != nil { - return err + return nil, err + } + + if config != nil { + listener = NewTLSListener(TCPKeepAliveListener{listener.(*net.TCPListener)}, config) } - return s.Serve(tls.NewListener(ln, config)) + other := NewWithServer(s) + other.listener = NewListener(listener) + return other, nil } // Serve provides a graceful equivalent net/http.Server.Serve. +// +// If listener is not an instance of *GracefulListener it will be wrapped +// to become one. func (s *GracefulServer) Serve(listener net.Listener) error { - var closing int32 + // Accept a net.Listener to preserve the interface compatibility with the + // standard http.Server. If it is not a GracefulListener then wrap it into + // one. + gracefulListener, ok := listener.(*GracefulListener) + if !ok { + gracefulListener = NewListener(listener) + listener = gracefulListener + } + s.listener = gracefulListener + // Wrap the server HTTP handler into graceful one, that will close kept + // alive connections if a new request is received after shutdown. + gracefulHandler := newGracefulHandler(s.Server.Handler) + s.Server.Handler = gracefulHandler + + // Start a goroutine that waits for a shutdown signal and will stop the + // listener when it receives the signal. That in turn will result in + // unblocking of the http.Serve call. go func() { s.shutdown <- true close(s.shutdown) - atomic.StoreInt32(&closing, 1) + gracefulHandler.Close() s.Server.SetKeepAlivesEnabled(false) - listener.Close() + gracefulListener.Close() }() originalConnState := s.Server.ConnState - // s.ConnState is invoked by the net/http.Server every time a connectiion + // s.ConnState is invoked by the net/http.Server every time a connection // changes state. It keeps track of each connection's state over time, // enabling manners to handle persisted connections correctly. s.ConnState = func(conn net.Conn, newState http.ConnState) { - s.lcsmu.RLock() - lastConnState := s.lastConnState[conn] - s.lcsmu.RUnlock() + gracefulConn := retrieveGracefulConn(conn) + oldState := gracefulConn.lastHTTPState + gracefulConn.lastHTTPState = newState switch newState { - // New connection -> StateNew case http.StateNew: + // New connection -> StateNew + gracefulConn.protected = true s.StartRoutine() - // (StateNew, StateIdle) -> StateActive case http.StateActive: - // The connection transitioned from idle back to active - if lastConnState == http.StateIdle { - s.StartRoutine() + // (StateNew, StateIdle) -> StateActive + if gracefulHandler.IsClosed() { + gracefulConn.Close() + break } - // StateActive -> StateIdle - // Immediately close newly idle connections; if not they may make - // one more request before SetKeepAliveEnabled(false) takes effect. - case http.StateIdle: - if atomic.LoadInt32(&closing) == 1 { - conn.Close() + if !gracefulConn.protected { + gracefulConn.protected = true + s.StartRoutine() } - s.FinishRoutine() - // (StateNew, StateActive, StateIdle) -> (StateClosed, StateHiJacked) - // If the connection was idle we do not need to decrement the counter. - case http.StateClosed, http.StateHijacked: - if lastConnState != http.StateIdle { + default: + // (StateNew, StateActive) -> (StateIdle, StateClosed, StateHiJacked) + if gracefulConn.protected { s.FinishRoutine() + gracefulConn.protected = false } - } - s.lcsmu.Lock() - if newState == http.StateClosed || newState == http.StateHijacked { - delete(s.lastConnState, conn) - } else { - s.lastConnState[conn] = newState + if s.stateHandler != nil { + s.stateHandler(conn, oldState, newState) } - s.lcsmu.Unlock() if originalConnState != nil { originalConnState(conn, newState) @@ -254,13 +336,14 @@ func (s *GracefulServer) Serve(listener net.Listener) error { err = s.Server.Serve(listener) } - // This block is reached when the server has received a shut down command - // or a real error happened. - if err == nil || atomic.LoadInt32(&closing) == 1 { - s.wg.Wait() - return nil + // An error returned on shutdown is not worth reporting. + if _, ok = err.(listenerAlreadyClosed); ok { + err = nil } + // Wait for pending requests to complete regardless the Serve result. + s.wg.Wait() + s.shutdownFinished <- true return err } @@ -309,3 +392,35 @@ func (s *GracefulServer) CloseOnInterrupt(signals ...os.Signal) *GracefulServer func (s *GracefulServer) SignalReceived() os.Signal { return s.signal } + +// gracefulHandler is used by GracefulServer to prevent calling ServeHTTP on +// to be closed kept-alive connections during the server shutdown. +type gracefulHandler struct { + closed int32 // accessed atomically. + wrapped http.Handler +} + +func newGracefulHandler(wrapped http.Handler) *gracefulHandler { + return &gracefulHandler{ + wrapped: wrapped, + } +} + +func (gh *gracefulHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if atomic.LoadInt32(&gh.closed) == 0 { + gh.wrapped.ServeHTTP(w, r) + return + } + r.Body.Close() + // Server is shutting down at this moment, and the connection that this + // handler is being called on is about to be closed. So we do not need to + // actually execute the handler logic. +} + +func (gh *gracefulHandler) Close() { + atomic.StoreInt32(&gh.closed, 1) +} + +func (gh *gracefulHandler) IsClosed() bool { + return atomic.LoadInt32(&gh.closed) == 1 +} diff --git a/server_test.go b/server_test.go index 2f54eaf..8d49d9c 100644 --- a/server_test.go +++ b/server_test.go @@ -1,17 +1,37 @@ package manners import ( - helpers "github.com/braintree/manners/test_helpers" + helpers "github.com/mailgun/manners/test_helpers" "net" "net/http" + "os" "testing" "time" ) +type httpInterface interface { + ListenAndServe() error + ListenAndServeTLS(certFile, keyFile string) error + Serve(listener net.Listener) error +} + +// Test that the method signatures of the methods we override from net/http/Server match those of the original. +func TestInterface(t *testing.T) { + var original, ours interface{} + original = &http.Server{} + ours = &GracefulServer{} + if _, ok := original.(httpInterface); !ok { + t.Errorf("httpInterface definition does not match the canonical server!") + } + if _, ok := ours.(httpInterface); !ok { + t.Errorf("GracefulServer does not implement httpInterface") + } +} + // Tests that the server allows in-flight requests to complete // before shutting down. func TestGracefulness(t *testing.T) { - server := newServer() + server := NewServer() wg := helpers.NewWaitGroup() server.wg = wg statechanged := make(chan http.ConnState) @@ -24,10 +44,9 @@ func TestGracefulness(t *testing.T) { if err := <-client.connected; err != nil { t.Fatal("Client failed to connect to server", err) } - // avoid a race between the client connection and the server accept - if state := <-statechanged; state != http.StateNew { - t.Fatal("Unexpected state", state) - } + // Even though the client is connected, the server ConnState handler may + // not know about that yet. So wait until it is called. + waitForState(t, statechanged, http.StateNew, "Request not received") server.Close() @@ -48,7 +67,7 @@ func TestGracefulness(t *testing.T) { // Tests that the server begins to shut down when told to and does not accept // new requests once shutdown has begun func TestShutdown(t *testing.T) { - server := newServer() + server := NewServer() wg := helpers.NewWaitGroup() server.wg = wg statechanged := make(chan http.ConnState) @@ -61,10 +80,9 @@ func TestShutdown(t *testing.T) { if err := <-client1.connected; err != nil { t.Fatal("Client failed to connect to server", err) } - // avoid a race between the client connection and the server accept - if state := <-statechanged; state != http.StateNew { - t.Fatal("Unexpected state", state) - } + // Even though the client is connected, the server ConnState handler may + // not know about that yet. So wait until it is called. + waitForState(t, statechanged, http.StateNew, "Request not received") // start the shutdown; once it hits waitgroup.Wait() // the listener should of been closed, though client1 is still connected @@ -94,36 +112,32 @@ func TestShutdown(t *testing.T) { <-exitchan } -// Test that a connection is closed upon reaching an idle state if and only if the server -// is shutting down. -func TestCloseOnIdle(t *testing.T) { - server := newServer() - wg := helpers.NewWaitGroup() - server.wg = wg - fl := helpers.NewListener() - runner := func() error { - return server.Serve(fl) - } - - startGenericServer(t, server, nil, runner) +// If a request is sent to a closed server via a kept alive connection then +// the server closes the connection upon receiving the request. +func TestRequestAfterClose(t *testing.T) { + // Given + server := NewServer() + srvStateChangedCh := make(chan http.ConnState, 100) + listener, srvClosedCh := startServer(t, server, srvStateChangedCh) - // Change to idle state while server is not closing; Close should not be called - conn := &helpers.Conn{} - server.ConnState(conn, http.StateIdle) - if conn.CloseCalled { - t.Error("Close was called unexpected") - } + client := newClient(listener.Addr(), false) + client.Run() + <-client.connected + client.sendrequest <- true + <-client.response server.Close() + if err := <-srvClosedCh; err != nil { + t.Error("Unexpected error during shutdown", err) + } - // wait until the server calls Close() on the listener - // by that point the atomic closing variable will have been updated, avoiding a race. - <-fl.CloseCalled + // When + client.sendrequest <- true + rr := <-client.response - conn = &helpers.Conn{} - server.ConnState(conn, http.StateIdle) - if !conn.CloseCalled { - t.Error("Close was not called") + // Then + if rr.body != nil || rr.err != nil { + t.Errorf("Request should be rejected, body=%v, err=%v", rr.body, rr.err) } } @@ -143,7 +157,7 @@ func waitForState(t *testing.T, waiter chan http.ConnState, state http.ConnState // Test that a request moving from active->idle->active using an actual // network connection still results in a corect shutdown func TestStateTransitionActiveIdleActive(t *testing.T) { - server := newServer() + server := NewServer() wg := helpers.NewWaitGroup() statechanged := make(chan http.ConnState) server.wg = wg @@ -160,8 +174,7 @@ func TestStateTransitionActiveIdleActive(t *testing.T) { for i := 0; i < 2; i++ { client.sendrequest <- true waitForState(t, statechanged, http.StateActive, "Client failed to reach active state") - <-client.idle - client.idlerelease <- true + <-client.response waitForState(t, statechanged, http.StateIdle, "Client failed to reach idle state") } @@ -196,7 +209,7 @@ func TestStateTransitionActiveIdleClosed(t *testing.T) { } for _, withTLS := range []bool{false, true} { - server := newServer() + server := NewServer() wg := helpers.NewWaitGroup() statechanged := make(chan http.ConnState) server.wg = wg @@ -217,12 +230,11 @@ func TestStateTransitionActiveIdleClosed(t *testing.T) { client.sendrequest <- true waitForState(t, statechanged, http.StateActive, "Client failed to reach active state") - err := <-client.idle - if err != nil { - t.Fatalf("tls=%t unexpected error from client %s", withTLS, err) + rr := <-client.response + if rr.err != nil { + t.Fatalf("tls=%t unexpected error from client %s", withTLS, rr.err) } - client.idlerelease <- true waitForState(t, statechanged, http.StateIdle, "Client failed to reach idle state") // client is now in an idle state @@ -241,3 +253,171 @@ func TestStateTransitionActiveIdleClosed(t *testing.T) { } } } + +// Test that supplying a non GracefulListener to Serve works +// correctly (ie. that the listener is wrapped to become graceful) +func TestWrapConnectionTcp(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatal("Failed to create listener", err) + } + + s := NewServer() + s.up = make(chan net.Listener) + + var called bool + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + s.Close() // clean shutdown as soon as handler exits + }) + s.Handler = handler + + serverr := make(chan error) + + go func() { + serverr <- s.Serve(l) + }() + + gl := <-s.up + if _, ok := gl.(*GracefulListener); !ok { + t.Fatal("connection was not wrapped into a GracefulListener") + } + + addr := l.Addr() + if _, err := http.Get("http://" + addr.String()); err != nil { + t.Fatal("Get failed", err) + } + + if err := <-serverr; err != nil { + t.Fatal("Error from Serve()", err) + } + + if !called { + t.Error("Handler was not called") + } +} + +func TestWrapConnectionUnix(t *testing.T) { + l, err := listenToUnix("/var/tmp/servertest") + if err != nil { + t.Fatal("Failed to create listener", err) + } + defer os.Remove("/var/tmp/servertest") + + _, err = os.Stat("/var/tmp/servertest") + if err != nil { + t.Fatal("Failed to create listener", err) + } + + s := NewServer() + s.up = make(chan net.Listener) + + var called bool + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + s.Close() // clean shutdown as soon as handler exits + }) + s.Handler = handler + + serverr := make(chan error) + + go func() { + serverr <- s.Serve(l) + }() + + gl := <-s.up + if _, ok := gl.(*GracefulListener); !ok { + t.Fatal("connection was not wrapped into a GracefulListener") + } + + //addr := l.Addr() + //if _, err := http.Get("http://" + addr.String()); err != nil { + // t.Fatal("Get failed", err) + //} + // + //if err := <-serverr; err != nil { + // t.Fatal("Error from Serve()", err) + //} + // + //if !called { + // t.Error("Handler was not called") + //} +} + +// Hijack listener +func TestHijackListener(t *testing.T) { + server := NewServer() + wg := helpers.NewWaitGroup() + server.wg = wg + listener, exitchan := startServer(t, server, nil) + + client := newClient(listener.Addr(), false) + client.Run() + + // wait for client to connect, but don't let it send the request yet + if err := <-client.connected; err != nil { + t.Fatal("Client failed to connect to server", err) + } + + // Make sure server1 got the request and added it to the waiting group + <-wg.CountChanged + + wg2 := helpers.NewWaitGroup() + server2, err := server.HijackListener(new(http.Server), nil) + server2.wg = wg2 + if err != nil { + t.Fatal("Failed to hijack listener", err) + } + + listener2, exitchan2 := startServer(t, server2, nil) + + // Close the first server + server.Close() + + // First server waits for the first request to finish + waiting := <-wg.WaitCalled + if waiting < 1 { + t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) + } + + // allow the client to finish sending the request and make sure the server exits after + // (client will be in connected but idle state at that point) + client.sendrequest <- true + close(client.sendrequest) + if err := <-exitchan; err != nil { + t.Error("Unexpected error during shutdown", err) + } + + client2 := newClient(listener2.Addr(), false) + client2.Run() + + // wait for client to connect, but don't let it send the request yet + select { + case err := <-client2.connected: + if err != nil { + t.Fatal("Client failed to connect to server", err) + } + case <-time.After(time.Second): + t.Fatal("Timeout connecting to the server", err) + } + + // Close the second server + server2.Close() + + waiting = <-wg2.WaitCalled + if waiting < 1 { + t.Errorf("Expected the waitgroup to equal 1 at shutdown; actually %d", waiting) + } + + // allow the client to finish sending the request and make sure the server exits after + // (client will be in connected but idle state at that point) + client2.sendrequest <- true + // Make sure that request resulted in success + if rr := <-client2.response; rr.err != nil { + t.Errorf("Client failed to write the request, error: %s", err) + } + close(client2.sendrequest) + if err := <-exitchan2; err != nil { + t.Error("Unexpected error during shutdown", err) + } +} diff --git a/static.go b/static.go index 9b4f5f4..726c941 100644 --- a/static.go +++ b/static.go @@ -3,12 +3,13 @@ package manners import ( "net" "net/http" - "os") + "os" +) var ( - defaultServer *GracefulServer + defaultServer *GracefulServer defaultSignals []os.Signal - hasSignals = false + hasSignals = false ) func preventReEntrance() { @@ -21,8 +22,8 @@ func preventReEntrance() { // net/http package. Call Close() to stop the server. func ListenAndServe(addr string, handler http.Handler) error { preventReEntrance() - defaultServer = NewServer(addr, handler) - if (hasSignals) { + defaultServer = NewWithServer(&http.Server{Addr: addr, Handler: handler}) + if hasSignals { defaultServer.CloseOnInterrupt(defaultSignals...) } return defaultServer.ListenAndServe() @@ -32,8 +33,8 @@ func ListenAndServe(addr string, handler http.Handler) error { // net/http package. Call Close() to stop the server. func ListenAndServeTLS(addr string, certFile string, keyFile string, handler http.Handler) error { preventReEntrance() - defaultServer = NewServer(addr, handler) - if (hasSignals) { + defaultServer = NewWithServer(&http.Server{Addr: addr, Handler: handler}) + if hasSignals { defaultServer.CloseOnInterrupt(defaultSignals...) } return defaultServer.ListenAndServeTLS(certFile, keyFile) @@ -44,7 +45,7 @@ func ListenAndServeTLS(addr string, certFile string, keyFile string, handler htt func Serve(l net.Listener, handler http.Handler) error { preventReEntrance() defaultServer = NewWithServer(&http.Server{Handler: handler}) - if (hasSignals) { + if hasSignals { defaultServer.CloseOnInterrupt(defaultSignals...) } return defaultServer.Serve(l) diff --git a/test_helpers/conn.go b/test_helpers/conn.go index 8c610f5..d7a298b 100644 --- a/test_helpers/conn.go +++ b/test_helpers/conn.go @@ -4,10 +4,13 @@ import "net" type Conn struct { net.Conn - CloseCalled bool + localAddr net.Addr +} + +func (f *Conn) LocalAddr() net.Addr { + return &net.IPAddr{} } func (c *Conn) Close() error { - c.CloseCalled = true return nil } diff --git a/test_helpers/listener.go b/test_helpers/listener.go index a74ac11..e3af35a 100644 --- a/test_helpers/listener.go +++ b/test_helpers/listener.go @@ -1,8 +1,8 @@ package test_helpers import ( - "net" - "errors" + "errors" + "net" ) type Listener struct { @@ -11,10 +11,10 @@ type Listener struct { } func NewListener() *Listener { - return &Listener{ - make(chan bool, 1), - make(chan bool, 1), - } + return &Listener{ + make(chan bool, 1), + make(chan bool, 1), + } } func (l *Listener) Addr() net.Addr { diff --git a/test_helpers/wait_group.go b/test_helpers/wait_group.go index 1df590d..192a121 100644 --- a/test_helpers/wait_group.go +++ b/test_helpers/wait_group.go @@ -4,25 +4,29 @@ import "sync" type WaitGroup struct { sync.Mutex - Count int - WaitCalled chan int + Count int + WaitCalled chan int + CountChanged chan int } func NewWaitGroup() *WaitGroup { return &WaitGroup{ - WaitCalled: make(chan int, 1), + WaitCalled: make(chan int, 1), + CountChanged: make(chan int, 1024), } } func (wg *WaitGroup) Add(delta int) { wg.Lock() wg.Count++ + wg.CountChanged <- wg.Count wg.Unlock() } func (wg *WaitGroup) Done() { wg.Lock() wg.Count-- + wg.CountChanged <- wg.Count wg.Unlock() } diff --git a/transition_test.go b/transition_test.go index 34fe5c6..585a4db 100644 --- a/transition_test.go +++ b/transition_test.go @@ -1,7 +1,7 @@ package manners import ( - helpers "github.com/braintree/manners/test_helpers" + helpers "github.com/mailgun/manners/test_helpers" "net/http" "strings" "testing" @@ -31,12 +31,12 @@ type transitionTest struct { } func testStateTransition(t *testing.T, test transitionTest) { - server := newServer() + server := NewServer() wg := helpers.NewWaitGroup() server.wg = wg startServer(t, server, nil) - conn := &helpers.Conn{} + conn := &gracefulConn{Conn: &helpers.Conn{}} for _, newState := range test.states { server.ConnState(conn, newState) } From 6dca05401fb465492427dae7f7167ed784660fa9 Mon Sep 17 00:00:00 2001 From: Rick Date: Wed, 12 Oct 2016 19:40:48 +0100 Subject: [PATCH 25/26] Added missing logging info --- server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 3a56f76..c32022a 100644 --- a/server.go +++ b/server.go @@ -207,7 +207,8 @@ func (s *GracefulServer) ListenAndServeTLS(certFile, keyFile string) error { return s.ListenAndServeTLSWithConfig(config) } -// ListenAndServeTLS provides a graceful equivalent of net/http.Serve.ListenAndServeTLS. +// ListenAndServeTLSWithConfig provides a graceful equivalent of net/http.Serve.ListenAndServeTLS +// using a bespoke TLS config. func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { addr := s.Addr if addr == "" { @@ -215,6 +216,7 @@ func (s *GracefulServer) ListenAndServeTLSWithConfig(config *tls.Config) error { } if s.listener == nil { + logger.Printf("Listening on tcp socket %s\n", addr) ln, err := net.Listen("tcp", addr) if err != nil { return err From d108ea52bf7a706b6fa2527eaa515f2cd9da260e Mon Sep 17 00:00:00 2001 From: Rick Date: Wed, 16 Nov 2016 23:14:21 +0000 Subject: [PATCH 26/26] deps --- server_test.go | 2 +- transition_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server_test.go b/server_test.go index 8d49d9c..4d96760 100644 --- a/server_test.go +++ b/server_test.go @@ -1,7 +1,7 @@ package manners import ( - helpers "github.com/mailgun/manners/test_helpers" + helpers "github.com/rickb777/manners/test_helpers" "net" "net/http" "os" diff --git a/transition_test.go b/transition_test.go index 585a4db..b0bb4a3 100644 --- a/transition_test.go +++ b/transition_test.go @@ -1,7 +1,7 @@ package manners import ( - helpers "github.com/mailgun/manners/test_helpers" + helpers "github.com/rickb777/manners/test_helpers" "net/http" "strings" "testing"