4545 AfterPubSubConnectionInstantiationEvent ,
4646 AfterSingleConnectionInstantiationEvent ,
4747 ClientType ,
48- EventDispatcher , AfterCommandExecutionEvent ,
48+ EventDispatcher , AfterCommandExecutionEvent , OnErrorEvent ,
4949)
5050from redis .exceptions import (
5151 ConnectionError ,
@@ -640,7 +640,14 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
640640 conn .send_command (* args , ** options )
641641 return self .parse_response (conn , command_name , ** options )
642642
643- def _close_connection (self , conn ) -> None :
643+ def _close_connection (
644+ self ,
645+ conn ,
646+ error : Optional [Exception ] = None ,
647+ failure_count : Optional [int ] = None ,
648+ start_time : Optional [float ] = None ,
649+ command_name : Optional [str ] = None ,
650+ ) -> None :
644651 """
645652 Close the connection before retrying.
646653
@@ -650,7 +657,27 @@ def _close_connection(self, conn) -> None:
650657 After we disconnect the connection, it will try to reconnect and
651658 do a health check as part of the send_command logic(on connection level).
652659 """
660+ if error and failure_count <= conn .retry .get_retries ():
661+ self ._event_dispatcher .dispatch (
662+ AfterCommandExecutionEvent (
663+ command_name = command_name ,
664+ duration_seconds = time .monotonic () - start_time ,
665+ server_address = conn .host ,
666+ server_port = conn .port ,
667+ db_namespace = str (conn .db ),
668+ error = error ,
669+ retry_attempts = failure_count ,
670+ )
671+ )
653672
673+ self ._event_dispatcher .dispatch (
674+ OnErrorEvent (
675+ error = error ,
676+ server_address = conn .host ,
677+ server_port = conn .port ,
678+ retry_attempts = failure_count ,
679+ )
680+ )
654681 conn .disconnect ()
655682
656683 # COMMAND EXECUTION AND PROTOCOL PARSING
@@ -673,7 +700,14 @@ def _execute_command(self, *args, **options):
673700 lambda : self ._send_command_parse_response (
674701 conn , command_name , * args , ** options
675702 ),
676- lambda _ : self ._close_connection (conn ),
703+ lambda error , failure_count : self ._close_connection (
704+ conn ,
705+ error ,
706+ failure_count ,
707+ start_time ,
708+ command_name ,
709+ ),
710+ with_failure_count = True
677711 )
678712
679713 self ._event_dispatcher .dispatch (
@@ -697,6 +731,14 @@ def _execute_command(self, *args, **options):
697731 error = e ,
698732 )
699733 )
734+ self ._event_dispatcher .dispatch (
735+ OnErrorEvent (
736+ error = e ,
737+ server_address = conn .host ,
738+ server_port = conn .port ,
739+ is_internal = False ,
740+ )
741+ )
700742 raise
701743
702744 finally :
@@ -974,13 +1016,42 @@ def clean_health_check_responses(self) -> None:
9741016 )
9751017 ttl -= 1
9761018
977- def _reconnect (self , conn ) -> None :
1019+ def _reconnect (
1020+ self ,
1021+ conn ,
1022+ error : Optional [Exception ] = None ,
1023+ failure_count : Optional [int ] = None ,
1024+ start_time : Optional [float ] = None ,
1025+ command_name : Optional [str ] = None ,
1026+ ) -> None :
9781027 """
9791028 The supported exceptions are already checked in the
9801029 retry object so we don't need to do it here.
9811030
9821031 In this error handler we are trying to reconnect to the server.
9831032 """
1033+ if error and failure_count <= conn .retry .get_retries ():
1034+ if command_name :
1035+ self ._event_dispatcher .dispatch (
1036+ AfterCommandExecutionEvent (
1037+ command_name = command_name ,
1038+ duration_seconds = time .monotonic () - start_time ,
1039+ server_address = conn .host ,
1040+ server_port = conn .port ,
1041+ db_namespace = str (conn .db ),
1042+ error = error ,
1043+ retry_attempts = failure_count ,
1044+ )
1045+ )
1046+
1047+ self ._event_dispatcher .dispatch (
1048+ OnErrorEvent (
1049+ error = error ,
1050+ server_address = conn .host ,
1051+ server_port = conn .port ,
1052+ retry_attempts = failure_count ,
1053+ )
1054+ )
9841055 conn .disconnect ()
9851056 conn .connect ()
9861057
@@ -996,12 +1067,60 @@ def _execute(self, conn, command, *args, **kwargs):
9961067 if conn .should_reconnect ():
9971068 self ._reconnect (conn )
9981069
999- response = conn . retry . call_with_retry (
1000- lambda : command ( * args , ** kwargs ),
1001- lambda _ : self . _reconnect ( conn ),
1002- )
1070+ if not len ( args ) == 0 :
1071+ command_name = args [ 0 ]
1072+ else :
1073+ command_name = None
10031074
1004- return response
1075+ # Start timing for observability
1076+ start_time = time .monotonic ()
1077+
1078+ try :
1079+ response = conn .retry .call_with_retry (
1080+ lambda : command (* args , ** kwargs ),
1081+ lambda error , failure_count : self ._reconnect (
1082+ conn ,
1083+ error ,
1084+ failure_count ,
1085+ start_time ,
1086+ command_name ,
1087+ ),
1088+ with_failure_count = True
1089+ )
1090+
1091+ if command_name :
1092+ self ._event_dispatcher .dispatch (
1093+ AfterCommandExecutionEvent (
1094+ command_name = command_name ,
1095+ duration_seconds = time .monotonic () - start_time ,
1096+ server_address = conn .host ,
1097+ server_port = conn .port ,
1098+ db_namespace = str (conn .db ),
1099+ )
1100+ )
1101+
1102+ return response
1103+ except Exception as e :
1104+ if command_name :
1105+ self ._event_dispatcher .dispatch (
1106+ AfterCommandExecutionEvent (
1107+ command_name = command_name ,
1108+ duration_seconds = time .monotonic () - start_time ,
1109+ server_address = conn .host ,
1110+ server_port = conn .port ,
1111+ db_namespace = str (conn .db ),
1112+ error = e ,
1113+ )
1114+ )
1115+ self ._event_dispatcher .dispatch (
1116+ OnErrorEvent (
1117+ error = e ,
1118+ server_address = conn .host ,
1119+ server_port = conn .port ,
1120+ is_internal = False ,
1121+ )
1122+ )
1123+ raise
10051124
10061125 def parse_response (self , block = True , timeout = 0 ):
10071126 """Parse the response from a publish/subscribe command"""
@@ -1494,6 +1613,9 @@ def _disconnect_reset_raise_on_watching(
14941613 self ,
14951614 conn : AbstractConnection ,
14961615 error : Exception ,
1616+ failure_count : Optional [int ] = None ,
1617+ start_time : Optional [float ] = None ,
1618+ command_name : Optional [str ] = None ,
14971619 ) -> None :
14981620 """
14991621 Close the connection reset watching state and
@@ -1505,6 +1627,27 @@ def _disconnect_reset_raise_on_watching(
15051627 After we disconnect the connection, it will try to reconnect and
15061628 do a health check as part of the send_command logic(on connection level).
15071629 """
1630+ if error and failure_count <= conn .retry .get_retries ():
1631+ self ._event_dispatcher .dispatch (
1632+ AfterCommandExecutionEvent (
1633+ command_name = command_name ,
1634+ duration_seconds = time .monotonic () - start_time ,
1635+ server_address = conn .host ,
1636+ server_port = conn .port ,
1637+ db_namespace = str (conn .db ),
1638+ error = error ,
1639+ retry_attempts = failure_count ,
1640+ )
1641+ )
1642+
1643+ self ._event_dispatcher .dispatch (
1644+ OnErrorEvent (
1645+ error = error ,
1646+ server_address = conn .host ,
1647+ server_port = conn .port ,
1648+ retry_attempts = failure_count ,
1649+ )
1650+ )
15081651 conn .disconnect ()
15091652
15101653 # if we were already watching a variable, the watch is no longer
@@ -1538,7 +1681,14 @@ def immediate_execute_command(self, *args, **options):
15381681 lambda : self ._send_command_parse_response (
15391682 conn , command_name , * args , ** options
15401683 ),
1541- lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1684+ lambda error , failure_count : self ._disconnect_reset_raise_on_watching (
1685+ conn ,
1686+ error ,
1687+ failure_count ,
1688+ start_time ,
1689+ command_name ,
1690+ ),
1691+ with_failure_count = True
15421692 )
15431693
15441694 self ._event_dispatcher .dispatch (
@@ -1563,6 +1713,14 @@ def immediate_execute_command(self, *args, **options):
15631713 error = e ,
15641714 )
15651715 )
1716+ self ._event_dispatcher .dispatch (
1717+ OnErrorEvent (
1718+ error = e ,
1719+ server_address = conn .host ,
1720+ server_port = conn .port ,
1721+ is_internal = False
1722+ )
1723+ )
15661724 raise
15671725
15681726
@@ -1709,6 +1867,10 @@ def _disconnect_raise_on_watching(
17091867 self ,
17101868 conn : AbstractConnection ,
17111869 error : Exception ,
1870+ failure_count : Optional [int ] = None ,
1871+ start_time : Optional [float ] = None ,
1872+ command_name : Optional [str ] = None ,
1873+ batch_size : Optional [int ] = None ,
17121874 ) -> None :
17131875 """
17141876 Close the connection, raise an exception if we were watching.
@@ -1719,6 +1881,28 @@ def _disconnect_raise_on_watching(
17191881 After we disconnect the connection, it will try to reconnect and
17201882 do a health check as part of the send_command logic(on connection level).
17211883 """
1884+ if error and failure_count <= conn .retry .get_retries ():
1885+ self ._event_dispatcher .dispatch (
1886+ AfterCommandExecutionEvent (
1887+ command_name = command_name ,
1888+ duration_seconds = time .monotonic () - start_time ,
1889+ server_address = conn .host ,
1890+ server_port = conn .port ,
1891+ db_namespace = str (conn .db ),
1892+ error = error ,
1893+ retry_attempts = failure_count ,
1894+ batch_size = batch_size ,
1895+ )
1896+ )
1897+
1898+ self ._event_dispatcher .dispatch (
1899+ OnErrorEvent (
1900+ error = error ,
1901+ server_address = conn .host ,
1902+ server_port = conn .port ,
1903+ retry_attempts = failure_count
1904+ )
1905+ )
17221906 conn .disconnect ()
17231907 # if we were watching a variable, the watch is no longer valid
17241908 # since this connection has died. raise a WatchError, which
@@ -1755,7 +1939,15 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
17551939 try :
17561940 response = conn .retry .call_with_retry (
17571941 lambda : execute (conn , stack , raise_on_error ),
1758- lambda error : self ._disconnect_raise_on_watching (conn , error ),
1942+ lambda error , failure_count : self ._disconnect_raise_on_watching (
1943+ conn ,
1944+ error ,
1945+ failure_count ,
1946+ start_time ,
1947+ operation_name ,
1948+ len (stack ),
1949+ ),
1950+ with_failure_count = True
17591951 )
17601952
17611953 self ._event_dispatcher .dispatch (
@@ -1781,6 +1973,14 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
17811973 batch_size = len (stack ),
17821974 )
17831975 )
1976+ self ._event_dispatcher .dispatch (
1977+ OnErrorEvent (
1978+ error = e ,
1979+ server_address = conn .host ,
1980+ server_port = conn .port ,
1981+ is_internal = False
1982+ )
1983+ )
17841984 raise
17851985
17861986 finally :
0 commit comments