@@ -32,7 +32,6 @@ const (
3232const shutdownEventKey = "box.shutdown"
3333
3434type ConnEventKind int
35- type ConnLogKind int
3635
3736var (
3837 errUnknownRequest = errors .New ("the passed connected request doesn't belong " +
@@ -50,19 +49,6 @@ const (
5049 Shutdown
5150 // Either reconnect attempts exhausted, or explicit Close is called.
5251 Closed
53-
54- // LogReconnectFailed is logged when reconnect attempt failed.
55- // LogReconnectFailed ConnLogKind = iota + 1
56- // LogLastReconnectFailed is logged when last reconnect attempt failed,
57- // connection will be closed after that.
58- // LogLastReconnectFailed
59- // LogUnexpectedResultId is logged when response with unknown id was received.
60- // Most probably it is due to request timeout.
61- LogUnexpectedResultId
62- // LogWatchEventReadFailed is logged when failed to read a watch event.
63- // LogWatchEventReadFailed
64- // LogBoxSessionPushUnsupported is logged when response type turned IPROTO_CHUNK.
65- // LogBoxSessionPushUnsupported
6652)
6753
6854// ConnEvent is sent throw Notify channel specified in Opts.
@@ -80,126 +66,6 @@ type connWatchEvent struct {
8066
8167var epoch = time .Now ()
8268
83- type ConnLogEvent interface {}
84-
85- // Events of logger
86- type LogReconnectFailed struct {
87- Reconnects uint
88- MaxReconnects uint
89- Err error
90- }
91-
92- type LogLastReconnectFailed struct {
93- Err error
94- }
95- type LogUnexpectedResultID struct {
96- Header Header
97- }
98- type LogWatchEventReadFailed struct {
99- Err error
100- }
101- type LogBoxSessionPushUnsupported struct {
102- Header Header
103- }
104-
105- // Logger is logger type expected to be passed in options.
106- type Logger interface {
107- Report (event ConnLogEvent , conn * Connection )
108- }
109-
110- type defaultSlogLogger struct {
111- l * slog.Logger
112- }
113-
114- func NewDefaultSlogLogger (l * slog.Logger ) Logger {
115- if l == nil {
116- l = slog .Default ()
117- }
118- return & defaultSlogLogger {l : l }
119- }
120-
121- type defaultLogger struct {}
122-
123- func attrsToAny (attrs []slog.Attr ) []any {
124- out := make ([]any , 0 , len (attrs )* 2 )
125- for _ , a := range attrs {
126- out = append (out , a .Key , a .Value .Any ())
127- }
128- return out
129- }
130-
131- func (d * defaultSlogLogger ) Report (event ConnLogEvent , conn * Connection ) {
132- level , msg , attrs := eventToSlog (event , conn )
133- args := attrsToAny (attrs )
134- ctx := context .Background ()
135- switch level {
136- case slog .LevelError :
137- d .l .Error (msg , args ... )
138- case slog .LevelWarn :
139- d .l .Warn (msg , args ... )
140- case slog .LevelInfo :
141- d .l .Info (msg , args ... )
142- case slog .LevelDebug :
143- d .l .Debug (msg , args ... )
144- default :
145- d .l .Log (ctx , level , msg , args ... )
146- }
147- }
148-
149- func eventToSlog (event ConnLogEvent , conn * Connection ) (slog.Level , string , []slog.Attr ) {
150- addr := "<nil>"
151- if conn != nil && conn .Addr () != nil {
152- addr = fmt .Sprintf ("%s" , conn .Addr ())
153- }
154-
155- switch e := event .(type ) {
156- case LogReconnectFailed :
157- msg := "tarantool: reconnect failed"
158- attrs := []slog.Attr {
159- slog .Int ("reconnects" , int (e .Reconnects )),
160- slog .Int ("max_reconnects" , int (e .MaxReconnects )),
161- slog .String ("addr" , addr ),
162- }
163- if e .Err != nil {
164- attrs = append (attrs , slog .String ("error" , e .Err .Error ()))
165- }
166- return slog .LevelWarn , msg , attrs
167-
168- case LogLastReconnectFailed :
169- msg := "tarantool: last reconnect failed, giving it up"
170- attrs := []slog.Attr {slog .String ("addr" , addr )}
171- if e .Err != nil {
172- attrs = append (attrs , slog .String ("error" , e .Err .Error ()))
173- }
174- return slog .LevelError , msg , attrs
175-
176- case LogUnexpectedResultID :
177- msg := "tarantool: unexpected response request id (probably cancelled request)"
178- attrs := []slog.Attr {
179- slog .String ("addr" , addr ),
180- slog .Any ("request_id" , e .Header .RequestId ),
181- }
182- return slog .LevelWarn , msg , attrs
183-
184- case LogWatchEventReadFailed :
185- msg := "tarantool: unable to parse watch event"
186- attrs := []slog.Attr {}
187- if e .Err != nil {
188- attrs = append (attrs , slog .String ("error" , e .Err .Error ()))
189- }
190- return slog .LevelWarn , msg , attrs
191-
192- case LogBoxSessionPushUnsupported :
193- msg := "tarantool: unsupported box.session.push()"
194- attrs := []slog.Attr {slog .Any ("request_id" , e .Header .RequestId ), slog .String ("addr" , addr )}
195- return slog .LevelInfo , msg , attrs
196-
197- default :
198- // unknown event: log type and value
199- return slog .LevelInfo , "tarantool: unexpected event" , []slog.Attr {slog .Any ("event" , event ), slog .String ("addr" , addr )}
200- }
201- }
202-
20369// Connection is a handle with a single connection to a Tarantool instance.
20470//
20571// It is created and configured with Connect function, and could not be
@@ -442,7 +308,7 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
442308 }
443309
444310 if conn .opts .Logger == nil {
445- conn .opts .Logger = defaultLogger {}
311+ conn .opts .Logger = NewSlogLogger ( slog . Default ())
446312 }
447313
448314 conn .cond = sync .NewCond (& conn .mutex )
@@ -483,6 +349,29 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
483349 return conn , err
484350}
485351
352+ func (conn * Connection ) logEvent (event LogEvent ) {
353+ if conn .opts .Logger != nil {
354+ conn .opts .Logger .Report (event , conn )
355+ }
356+ }
357+
358+ // stateToString преобразует состояние соединения в строку
359+ func (conn * Connection ) stateToString () string {
360+ state := atomic .LoadUint32 (& conn .state )
361+ switch state {
362+ case connDisconnected :
363+ return "disconnected"
364+ case connConnected :
365+ return "connected"
366+ case connShutdown :
367+ return "shutdown"
368+ case connClosed :
369+ return "closed"
370+ default :
371+ return "unknown"
372+ }
373+ }
374+
486375// ConnectedNow reports if connection is established at the moment.
487376func (conn * Connection ) ConnectedNow () bool {
488377 return atomic .LoadUint32 (& conn .state ) == connConnected
@@ -652,10 +541,20 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
652541 return
653542}
654543
544+ // connect обновленная версия с новым логированием
655545func (conn * Connection ) connect (ctx context.Context ) error {
656546 var err error
657547 if conn .c == nil && conn .state == connDisconnected {
658548 if err = conn .dial (ctx ); err == nil {
549+ // Определяем номер попытки переподключения
550+ var reconnects uint = 0
551+ //TODO нужно отслеживать количество переподключений
552+
553+ // Логируем успешное подключение
554+ conn .logEvent (ConnectedEvent {
555+ baseEvent : newBaseEvent (conn .addr ),
556+ Reconnects : reconnects ,
557+ })
659558 conn .notify (Connected )
660559 return nil
661560 }
@@ -666,24 +565,30 @@ func (conn *Connection) connect(ctx context.Context) error {
666565 return err
667566}
668567
568+ // closeConnection обновленная версия с новым логированием
669569func (conn * Connection ) closeConnection (neterr error , forever bool ) (err error ) {
670570 conn .lockShards ()
671571 defer conn .unlockShards ()
572+
672573 if forever {
673574 if conn .state != connClosed {
674575 close (conn .control )
675576 atomic .StoreUint32 (& conn .state , connClosed )
676577 conn .cond .Broadcast ()
677- // Free the resources.
678- if conn .shutdownWatcher != nil {
679- go conn .shutdownWatcher .Unregister ()
680- conn .shutdownWatcher = nil
681- }
578+ // Логируем закрытие соединения
579+ conn .logEvent (ClosedEvent {
580+ baseEvent : newBaseEvent (conn .addr ),
581+ })
682582 conn .notify (Closed )
683583 }
684584 } else {
685585 atomic .StoreUint32 (& conn .state , connDisconnected )
686586 conn .cond .Broadcast ()
587+ // Логируем отключение
588+ conn .logEvent (DisconnectedEvent {
589+ baseEvent : newBaseEvent (conn .addr ),
590+ Reason : neterr ,
591+ })
687592 conn .notify (Disconnected )
688593 }
689594 if conn .c != nil {
@@ -715,6 +620,7 @@ func (conn *Connection) getDialTimeout() time.Duration {
715620 return dialTimeout
716621}
717622
623+ // runReconnects обновленная версия с новым логированием
718624func (conn * Connection ) runReconnects (ctx context.Context ) error {
719625 dialTimeout := conn .getDialTimeout ()
720626 var reconnects uint
@@ -728,14 +634,6 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
728634 cancel ()
729635
730636 if err != nil {
731- // The error will most likely be the one that Dialer
732- // returns to us due to the context being cancelled.
733- // Although this is not guaranteed. For example,
734- // if the dialer may throw another error before checking
735- // the context, and the context has already been
736- // canceled. Or the context was not canceled after
737- // the error was thrown, but before the context was
738- // checked here.
739637 if ctx .Err () != nil {
740638 return err
741639 }
@@ -747,23 +645,33 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
747645 return nil
748646 }
749647
750- conn .opts .Logger .Report (LogReconnectFailed , conn , reconnects , err )
648+ // Новое логирование события
649+ conn .logEvent (ReconnectFailedEvent {
650+ baseEvent : newBaseEvent (conn .addr ),
651+ Reconnects : reconnects ,
652+ MaxReconnects : conn .opts .MaxReconnects ,
653+ Error : err ,
654+ IsInitial : conn .addr == nil ,
655+ })
656+
751657 conn .notify (ReconnectFailed )
752658 reconnects ++
753659 conn .mutex .Unlock ()
754660
755661 select {
756662 case <- ctx .Done ():
757- // Since the context is cancelled, we don't need to do anything.
758- // Conn.connect() will return the correct error.
759663 case <- t .C :
760664 }
761665
762666 conn .mutex .Lock ()
763667 }
764668
765- conn .opts .Logger .Report (LogLastReconnectFailed , conn , err )
766- // mark connection as closed to avoid reopening by another goroutine
669+ // Новое логирование события последней неудачной попытки
670+ conn .logEvent (LastReconnectFailedEvent {
671+ baseEvent : newBaseEvent (conn .addr ),
672+ Error : err ,
673+ })
674+
767675 return ClientError {ErrConnectionClosed , "last reconnect failed" }
768676}
769677
@@ -917,6 +825,7 @@ func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
917825 return event , nil
918826}
919827
828+ // reader обновленная версия с новым логированием
920829func (conn * Connection ) reader (r io.Reader , c Conn ) {
921830 events := make (chan connWatchEvent , 1024 )
922831 defer close (events )
@@ -953,11 +862,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
953862 ErrProtocolError ,
954863 fmt .Sprintf ("failed to decode IPROTO_EVENT: %s" , err ),
955864 }
956- conn .opts .Logger .Report (LogWatchEventReadFailed , conn , err )
865+ // Новое логирование события
866+ conn .logEvent (WatchEventReadFailedEvent {
867+ baseEvent : newBaseEvent (conn .addr ),
868+ Error : err ,
869+ })
957870 }
958871 continue
959872 } else if code == iproto .IPROTO_CHUNK {
960- conn .opts .Logger .Report (LogBoxSessionPushUnsupported , conn , header )
873+ // Новое логирование события
874+ conn .logEvent (BoxSessionPushUnsupportedEvent {
875+ baseEvent : newBaseEvent (conn .addr ),
876+ RequestId : header .RequestId ,
877+ })
961878 } else {
962879 if fut = conn .fetchFuture (header .RequestId ); fut != nil {
963880 if err := fut .SetResponse (header , & buf ); err != nil {
@@ -968,7 +885,11 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
968885 }
969886
970887 if fut == nil {
971- conn .opts .Logger .Report (LogUnexpectedResultId , conn , header )
888+ // Новое логирование события
889+ conn .logEvent (UnexpectedResultIdEvent {
890+ baseEvent : newBaseEvent (conn .addr ),
891+ RequestId : header .RequestId ,
892+ })
972893 }
973894 }
974895}
@@ -1214,6 +1135,7 @@ func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
12141135 }
12151136}
12161137
1138+ // timeouts обновленная версия с новым логированием
12171139func (conn * Connection ) timeouts () {
12181140 timeout := conn .opts .Timeout
12191141 t := time .NewTimer (timeout )
@@ -1247,6 +1169,13 @@ func (conn *Connection) timeouts() {
12471169 })
12481170 conn .markDone (fut )
12491171 shard .bufmut .Unlock ()
1172+
1173+ // Логируем таймаут
1174+ conn .logEvent (TimeoutEvent {
1175+ baseEvent : newBaseEvent (conn .addr ),
1176+ RequestId : fut .requestId ,
1177+ Timeout : timeout ,
1178+ })
12501179 }
12511180 if pair .first != nil && pair .first .timeout < minNext {
12521181 minNext = pair .first .timeout
0 commit comments