Code Monkey home page Code Monkey logo

pybreaker's Introduction

PyBreaker

PyBreaker is a Python implementation of the Circuit Breaker pattern, described in Michael T. Nygard's book Release It!.

In Nygard's words, "circuit breakers exists to allow one subsystem to fail without destroying the entire system. This is done by wrapping dangerous operations (typically integration points) with a component that can circumvent calls when the system is not healthy".

Features

  • Configurable list of excluded exceptions (e.g. business exceptions)
  • Configurable failure threshold and reset timeout
  • Support for several event listeners per circuit breaker
  • Can guard generator functions
  • Functions and properties for easy monitoring and management
  • Thread-safe
  • Optional redis backing
  • Optional support for asynchronous Tornado calls

Requirements

Installation

Run the following command line to download the latest stable version of PyBreaker from PyPI:

$ pip install pybreaker

If you are a Git user, you might want to install the current development version in editable mode:

$ git clone git://github.com/danielfm/pybreaker.git
$ cd pybreaker
$ # run tests (on windows omit ./)
$ ./pw test
$ pip install -e .

Usage

The first step is to create an instance of CircuitBreaker for each integration point you want to protect against:

import pybreaker

# Used in database integration points
db_breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)

CircuitBreaker instances should live globally inside the application scope, e.g., live across requests.

Note

Integration points to external services (i.e. databases, queues, etc) are more likely to fail, so make sure to always use timeouts when accessing such services if there's support at the API level.

If you'd like to use the Redis backing, initialize the CircuitBreaker with a CircuitRedisStorage:

import pybreaker
import redis

redis = redis.StrictRedis()
db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, redis))

Do not initialize the Redis connection with the decode_responses set to True, this will force returning ASCII objects from redis and in Python3+ will fail with:

AttributeError: 'str' object has no attribute 'decode'

Note

You may want to reuse a connection already created in your application, if you're using django_redis for example:

import pybreaker
from django_redis import get_redis_connection

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default')))

Note

If you require multiple, independent CircuitBreakers and wish to store their states in Redis, it is essential to assign a unique namespace for each CircuitBreaker instance. This can be achieved by specifying a distinct namespace parameter in the CircuitRedisStorage constructor. for example:

import pybreaker
from django_redis import get_redis_connection

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, get_redis_connection('default'),namespace='unique_namespace'))

Event Listening

There's no need to subclass CircuitBreaker if you just want to take action when certain events occur. In that case, it's better to subclass CircuitBreakerListener instead:

class DBListener(pybreaker.CircuitBreakerListener):
    "Listener used by circuit breakers that execute database operations."

    def before_call(self, cb, func, *args, **kwargs):
        "Called before the circuit breaker `cb` calls `func`."
        pass

    def state_change(self, cb, old_state, new_state):
        "Called when the circuit breaker `cb` state changes."
        pass

    def failure(self, cb, exc):
        "Called when a function invocation raises a system error."
        pass

    def success(self, cb):
        "Called when a function invocation succeeds."
        pass

class LogListener(pybreaker.CircuitBreakerListener):
    "Listener used to log circuit breaker events."

    def state_change(self, cb, old_state, new_state):
        msg = "State Change: CB: {0}, New State: {1}".format(cb.name, new_state)
        logging.info(msg)

To add listeners to a circuit breaker:

# At creation time...
db_breaker = pybreaker.CircuitBreaker(listeners=[DBListener(), LogListener()])

# ...or later
db_breaker.add_listeners(OneListener(), AnotherListener())

What Does a Circuit Breaker Do?

Let's say you want to use a circuit breaker on a function that updates a row in the customer database table:

@db_breaker
def update_customer(cust):
    # Do stuff here...
    pass

# Will trigger the circuit breaker
updated_customer = update_customer(my_customer)

Or if you don't want to use the decorator syntax:

def update_customer(cust):
    # Do stuff here...
    pass

# Will trigger the circuit breaker
updated_customer = db_breaker.call(update_customer, my_customer)

Or use it as a context manager and a with statement:

# Will trigger the circuit breaker
with db_breaker.calling():
    # Do stuff here...
    pass

According to the default parameters, the circuit breaker db_breaker will automatically open the circuit after 5 consecutive failures in update_customer.

When the circuit is open, all calls to update_customer will fail immediately (raising CircuitBreakerError) without any attempt to execute the real operation. If you want the original error to be thrown when the circuit trips, set the throw_new_error_on_trip option to False:

pybreaker.CircuitBreaker(..., throw_new_error_on_trip=False)

After 60 seconds, the circuit breaker will allow the next call to update_customer pass through. If that call succeeds, the circuit is closed; if it fails, however, the circuit is opened again until another timeout elapses.

Optional Tornado Support

A circuit breaker can (optionally) be used to call asynchronous Tornado functions:

from tornado import gen

@db_breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_update(cust):
    # Do async stuff here...
    pass

Or if you don't want to use the decorator syntax:

@gen.coroutine
def async_update(cust):
    # Do async stuff here...
    pass

updated_customer = db_breaker.call_async(async_update, my_customer)

Excluding Exceptions

By default, a failed call is any call that raises an exception. However, it's common to raise exceptions to also indicate business exceptions, and those exceptions should be ignored by the circuit breaker as they don't indicate system errors:

# At creation time...
db_breaker = CircuitBreaker(exclude=[CustomerValidationError])

# ...or later
db_breaker.add_excluded_exception(CustomerValidationError)

In that case, when any function guarded by that circuit breaker raises CustomerValidationError (or any exception derived from CustomerValidationError), that call won't be considered a system failure.

So as to cover cases where the exception class alone is not enough to determine whether it represents a system error, you may also pass a callable rather than a type:

db_breaker = CircuitBreaker(exclude=[lambda e: type(e) == HTTPError and e.status_code < 500])

You may mix types and filter callables freely.

Monitoring and Management

A circuit breaker provides properties and functions you can use to monitor and change its current state:

# Get the current number of consecutive failures
print(db_breaker.fail_counter)

# Get/set the maximum number of consecutive failures
print(db_breaker.fail_max)
db_breaker.fail_max = 10

# Get/set the current reset timeout period (in seconds)
print db_breaker.reset_timeout
db_breaker.reset_timeout = 60

# Get the current state, i.e., 'open', 'half-open', 'closed'
print(db_breaker.current_state)

# Closes the circuit
db_breaker.close()

# Half-opens the circuit
db_breaker.half_open()

# Opens the circuit
db_breaker.open()

These properties and functions might and should be exposed to the operations staff somehow as they help them to detect problems in the system.

Contributing

Run tests:

$ ./pw test

Code formatting (black and isort) and linting (mypy) :

$ ./pw format
$ ./pw lint

Above commands will automatically install the necessary tools inside .pyprojectx and also install pre-commit hooks.

List available commands:

$ ./pw -i

pybreaker's People

Contributors

avnercohen avatar chris-robo-fivestars avatar chrisvaughn avatar cruuncher avatar danielfm avatar felipeagger avatar houbie avatar jshiell avatar kludex avatar mahdi-ba avatar martijnthe avatar mnguyenngo avatar phillbaker avatar reetasingh avatar rickhutcheson avatar rodrigobraga avatar tczhaodachuan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pybreaker's Issues

(๐Ÿž) incorrect typing for exclude argument

Exclude is defined as:
exclude: Sequence[Type[ExceptionType]] | None = None,
but it can also be a callable. Mypy complains about following code (when not ignoring the type error):

pybreaker.CircuitBreaker(
        fail_max=CIRCUIT_BREAKER_MAX_FAIL,
        reset_timeout=CIRCUIT_BREAKER_RESET_TIMEOUT,
        # don't trip circuit on client errors, except 429 (too Many Requests)
        exclude=[  # type: ignore[arg-type]
            lambda e: isinstance(e, HTTPError)
            and e.response.status_code < 500
        ],
    )

Typo - False => False

Hi,

if redis package is not installed then this code raise error:

try:
    from redis.exceptions import RedisError
    HAS_REDIS_SUPPORT = True
except ImportError:
HAS_REDIS_SUPPORT = FALSE

please, rewrite FALSE to False.

CircuitBreakerError hiding a real ConnectTimeout

This behavior I feel is incorrect.

The failure which passes the fail threshold throws a CircuitBreakerError instead of the actual underlying exception. To me, this is semantically incorrect. A real error happened, and the circuit was not bypassed, so why are we showing a CircuitBreakerError?

We are using this in cases of ReadTimeout and ConnectTimeout when connecting to external services, and which one is the case matters. We store the exception name in the DB, and we erroneously get CircuitBreakerError now when it was actually a ReadTimeout. This matters. The difference between a ReadTimeout and a CircuitBreakerError is the difference between an action that was potentially successful, and definitely not successful.

I've looked through the code and this is a fairly easy fix, which I can open a PR for, but I need to know if there's agreement on whether this is semantically the way it should behave or not.

At the very least it should be something like

CircuitBreakerPopped and CircuitBreakerOpen so that the cases of the breaker popping, and skipping operation are differentiable. Then in the CircuitBreakerPopped case, I can grab the next exception up on the stack for logging

define two CircuitBreaker problem

I define two CircuitBreaker But if one of them changes the state of all CircuitBreaker change!!!
I think , need to change the code when set in Redis and use the name of CircuitBreaker to solve it

breaker = pybreaker.CircuitBreaker(name='breaker', fail_max=3, reset_timeout=2, listeners=[NotifyListener()],
                                   state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, redis))

other_breaker = pybreaker.CircuitBreaker(name='other_breaker', fail_max=3, reset_timeout=2, listeners=[NotifyListener()],
                                   state_storage=pybreaker.CircuitRedisStorage(pybreaker.STATE_CLOSED, redis))

New release

Are there plans to make a new release including the functionality to define the excluded errors as a callable?

Async support?

Hey, pybreaker doesn't behave as expected with async code on the asyncio event loop. Is it because it isn't supported or am I doing something wrong?

Here's my code:

data_services_breaker = pybreaker.CircuitBreaker(fail_max=2, reset_timeout=60)

@data_services_breaker
async def get_all_bookings():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.githubXXXX.com/events') as resp:
            return await resp.text()

CircuitBreakerError is never thrown...

Thanks!

Maintainership

I'll be opening some PRs on this project, as I intend to use it extensively.

I'd like to know if this package is/will be actively maintained, if I should fork it - I'll still intend to submit the PRs here if that's the case - or if I can help here with the maintenance. ๐Ÿ™

In any case, thanks so much for this package! ๐Ÿ™Œ

Exceptions in generators have wrong location

We're seeing unusual behavior when wrapping a generator (a contextmanager specifically) that has a try/except in a circuit breaker. Specifically, the try/except in the wrapped function isn't being executed.

A simplified example is:

from contextlib import contextmanager
from pybreaker import CircuitBreaker

class ExceptionA(Exception): pass

class ExceptionB(Exception): pass


breaker = CircuitBreaker()

class Foo:
  @contextmanager
  @breaker
  def wrapper(self):
    try:
      yield
    except ExceptionA as e:
      raise ExceptionB()

  def foo(self):
    with self.wrapper():
      raise ExceptionA()


try:
  Foo().foo()
except ExceptionB as e:
  print('caught ExceptionB', e)

This example should print "caught ExceptionB", however, it raises:

Traceback (most recent call last):
  File "context.py", line 67, in <module>
    Foo().foo()
  File "context.py", line 63, in foo
    raise ExceptionA()
__main__.ExceptionA

At the root, I believe that these lines:

pybreaker/src/pybreaker.py

Lines 744 to 745 in 42b05c7

except BaseException as e:
self._handle_error(e)

should not have _handle_error invoke raise but call wrapped_generator.throw(e).

Would it make sense to submit a PR for this? Perhaps a new argument to _handle_error can make raising optional?

TTL on Redis

Currently, there's no expire time on the keys created on Redis. I'm concerned about orphan keys during the lifetime of the application. A full clean can be performed from time to time... But I guess we can also enable a TTL parameter on those.

To be specific, image I create a namespace "potato", then I'll have the key "potato:pybreaker:state". If I decide to change the namespace, I'll have the previous key being orphan.

Old versions of Python

If you want to maintain support for older versions of Python, you could change the download source for those versions to python.org directly instead of the seemingly now missing files from the TravisCI. Is that something you care about, or do you want to drop support for these older versions that are no longer supported overall?

Example: Python 3.2.6 - https://www.python.org/downloads/release/python-326/

Happy to PR either way @danielfm

Monitoring two or more funcions

I'm trying use the decorator to monitoring two functions saving the state in redis, but when the first function is ok and the second function is failed, and i make a request again, the first function when is ok reset the counter of fails and the circuit never open

Exception blacklist instead of whitelist?

For the use case I'm working on, I would prefer to specify a list of exceptions that WILL count against the breaker's fail_counter, rather than a list of exceptions that shouldn't be counted against it. That would prevent any unexpected run-time error from tripping the breaker.
What do you think?

proposal: adding a name property to CircuitBreaker

Thanks for the great library. I'm adding logging to our use of circuit breakers so that we can see just how often states are changing. To make our logs more informative I've subclassed CircuitBreaker to add a "name" property that I can use in logs, statsd counters, etc. Is there any interest in adding this to CircuitBreaker? If so, I'll create a PR.

Circuit erroneously closed when using a shared storage

Due to these lines:

self._state = CircuitClosedState(self)

self._breaker._state_storage.reset_counter()

Also, this will cause the fail counter to be reset anytime a new CircuitBreaker is instantiated using the same shared storage (like the provided CircuitRedisStorage).

This is major problem for usage within web app servers, where you may have many processes coming in and out of existence at any time due to scaling behavior or processes sporadically dying and being resurrected.

It seems like a possible solution might involve rethinking what the notify argument is being used for. It appears to be set to True in all cases except for this initial default value being set on line 62. Perhaps move the state change effect into the if notify: block as well?

A better solution might be to not assume that a CircuitBreaker should start in the Closed state, but instead ask the self._state_storage object for the current state and initialize the CircuitBreaker without affecting the state storage.

Implement _exceptions list

Hi All,

it would be nice if function is_system_error can work with self._exceptions as well. I am about to implement ES circuit breaker for just one exception (ES BROKEN CONNECTION) so that I have to create my own Breaker (subclassing CircuitBreaker) and rewrite function is_system_error instead of just define one exception in the new list self._exceptions.

Regards,
Vojtech.

Allow installing CircuitBreakerStorageBasedTestCase as a pip extra

I wrote a custom CircuitBreakerStorage class for a different cache backend than Redis, and I'd like to test it using CircuitBreakerStorageBasedTestCase. But the tests module is not included in setup. Please allow installing CircuitBreakerStorageBasedTestCase as a pip extra, preferably separate from the rest of the tests stuff.

Error thrown when state is missing on Redis

Hi! I was testing how pybreaker would behave in various scenarios where state is being stored on Redis and the Redis server misbehaves.

CircuitRedisStorage will handle intermittent and complete connection loss with the Redis server in an acceptable manner. But it doesn't handle data loss on the Redis server gracefully. When no value has been set for a key, the Redis library returns None. This results in an AttributeError being thrown on this line:

return self._redis.get(self._namespace('state')).decode('utf-8')

I opened a PR (#25) which aims to resolve this by doing the following when it finds missing data on Redis:

  • Resetting the state on Redis to the fallback circuit state
  • Reset the fail counter

Also, perhaps the circuit breaker's listeners should be notified with a custom exception when this occurs? I'm open to suggestions.

Should `current_state` respect `reset_timeout`?

We currently monitor the state of several circuit breakers via the current_state attribute. However, we've noticed that if some of these connections have spiky traffic and trip the breaker, they can stay in the open state until another request is made which clears the state.

The breaker itself is operating as expected, however, our monitoring records in correct data. What do you think of including a check of _state_storage.opened_at in current_state? We're including this check in our code currently, but if this behavior at the library level seems incorrect I can open a PR.

External state support?

Would a PR to add external state be accepted?

We're looking at using pybreaker to manage the healthiness of an external API and we'd like to aggregate exceptions/failures across multiple processes/machines. For a first pass we'll probably throw data in redis, but I'd understand not wanting to add that as a dependency to this library. We were thinking of something similar to the pattern described here: https://github.com/sittercity/breaker_box#persistence-factories, which abstracts the storage a bit. What do you think would be a good approach that fits?

redis storage open state initialization?

Hi danielfm,

Does the redis storage is intended as the shared breaker state storage for different workers? it seems like the current implementation would be stayed in open state.
For example:
start 32 workers, each worker with state as closed, if one worker enter state open, it will store the state in the redis. when another worker receive the request, it will compare its state name with redis storage, and enter open state, but in the CircuitOpenState:

 def __init__(self, cb, prev_state=None, notify=False):
        """
        Moves the given circuit breaker `cb` to the "open" state.
        """
        super(CircuitOpenState, self).__init__(cb, STATE_OPEN)
        self._breaker._state_storage.opened_at = datetime.utcnow()
        if notify:
            for listener in self._breaker.listeners:
                listener.state_change(self._breaker, prev_state, self)

It did not read redis storage opened_at, but set it as current time, it cause the closed worker has no way to enter half-open state?

current tests are not fully "deterministic" / have timing issues

which can be easily triggered by running the tests under a heavy loaded system.

An easy/possible way to achieve this is to start many(sufficiently) instance of the tests in parallel :

gstarck@taf $ for x in $(seq 20)
> do
> (python setup.py test &> res$x || echo "res$x failed .." ) &
> done | grep -c failed

5
โœ” ~/work/public/python/pybreaker [master|โ€ฆ20]
gstarck@taf $

Should listener.failure be called when the circuit is closed and a call fails?

When the circuit breaker's state is closed and the call to the wrapped function fails the listener.failure function is not called because an exception is thrown in CircuitClosedState.on_failure. Is this behaviour intentional?

The block below is from the CircuitBreakerState class:

    def _handle_error(self, exc):
        """
        Handles a failed call to the guarded operation.
        """
        if self._breaker.is_system_error(exc):
            self._breaker._inc_counter()
            self.on_failure(exc)
            for listener in self._breaker.listeners:
                listener.failure(self._breaker, exc)
        else:
            self._handle_success()
        raise exc

thread safety ?

Hi,

not an issue but a question:

is it safe to use the same breaker instance (CircuitBreaker) by multiple threads, possibly concurrently (modulo GIL ofcourse) ?

thx for the lib :)

Missing py.typed Marker

Hello, thanks for the great package.
When using it with mypy I will get the missing py.typed marker as this is not being exported alongside the package;
https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports

Skipping analyzing "pybreaker": module is installed, but missing library stubs or py.typed marker [import] See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports

Any way that could be included with the next version?

0.3.4 release

Can you make a new release version and add it to PyPI? Thanks!

Python 3 support

In:

state = state_bytes.decode('utf-8')

state = state_bytes.decode('utf-8')

This fails in python 3 with:

AttributeError: 'str' object has no attribute 'decode'

Any plans to fix this, will a PR help on this one?

Contextmanager / `with` API?

Would you be open to accepting a PR to make CircuitBreaker also possible to use like so:

with circuit_breaker:
   do_the_thing_that_can_fail()

I think this is slightly nicer than the .call(...) API.

Thanks for creating & maintaining this library!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.