Code Monkey home page Code Monkey logo

pyensign's Introduction

pyensign

Welcome to pyensign!

This repository contains the Ensign driver, SDK, and helpers for Python. For the main ensign repo, go here. We also have SDKs for Javascript and Go.

Installation

PyEnsign is compatible with Python >= 3.7 (Note: we can't guarantee PyEnsign's compatibility with earlier versions of Python due to PyEnsign's dependence on the grpcio package). The simplest way to install PyEnsign and its dependencies is from PyPI with pip, Python's preferred package installer.

pip install pyensign

Configuration

The Ensign client provides access to the unified API for managing topics and publishing/subscribing to topics. Creating a client requires a client ID and client secret (your API key).

from pyensign.ensign import Ensign

client = Ensign(client_id=<your client ID>, client_secret=<your client secret>)

If not provided the client ID and client secret will be obtained from the ENSIGN_CLIENT_ID and ENSIGN_CLIENT_SECRET environment variables.

Getting to know the PyEnsign API

The sample code below describes some of the core PyEnsign API, but if you're looking for a minimal end-to-example, check this out first.

Publishing

Use Ensign.publish() to publish events to a topic. All events must contain some data (the event payload) in binary format and a mimetype. The mimetype helps subscribers consuming the event determine how to decode the payload.

from pyensign.events import Event

# Publishing a single event
event = Event(b'{"temp": 72, "units": "fahrenheit"}', "application/json")
await client.publish("weather", event)

# Publishing multiple events
events = [
    Event(b'{"temp": 72, "units": "fahrenheit"}', "application/json"),
    Event(b'{"temp": 76, "units": "fahrenheit"}', "application/json")
]
await client.publish("weather", events)

This will raise an exception if the topic doesn't exist. If you aren't sure that a topic exists, you can use Ensign.ensure_topic_exists() to create the topic if it doesn't exist.

await client.ensure_topic_exists("weather")

How do you know if an event was actually published? Ensign.publish allows callbacks to be specified when the client receives acks and nacks from the server. The first argument in the callback is the Ack or Nack. An Ack contains the timestamp when the event was committed. A Nack is returned if the event couldn't be committed and contains the ID of the event along with an error describing what went wrong.

async def handle_ack(self, ack):
    ts = datetime.fromtimestamp(ack.committed.seconds + ack.committed.nanos / 1e9)
    print(f"Event committed at {ts}")

async def handle_nack(self, nack):
    print(f"Could not commit event {nack.id} with error {nack.code}: {nack.error}")

await client.publish("weather", event, on_ack=handle_ack, on_nack=handle_nack)

Subscribing

Use Ensign.subscribe() to subscribe to one or more topics.

async for event in client.subscribe("weather", "forecast"):
    print(event)
    await event.ack()
Event:
	id: b'\x01\x89\xd2\x1a?,A\x03\xf2\x04\xa6yd\xdf\x0b<'
	data: b'{"temp": "72", "units": "fahrenheit"}'
	mimetype: application/json
	schema: WeatherUpdate v1.0.0
	state: EventState.SUBSCRIBED
	created: 2023-08-07 17:24:41
	committed: 2023-08-07 17:24:42.930920

The Event object contains coroutines for acking and nacking an event back to the Ensign service. Subscribers should normally invoke Event.ack() once the event has been successfully consumed, or Event.nack() if the event needs to be redelivered.

Decorators

PyEnsign has decorators to convert your existing async functions into publishers and subscribers. For example, if you have a common function that you use to retrieve weather data, you could mark it with @publisher to automatically publish the returned object to Ensign.

from pyensign.ensign import authenticate, publisher

@authenticate()
@publisher("weather")
async def current_weather():
    return {
        "temp": 72,
        "units": "fahrenheit"
    }

await current_weather()

This is equivalent to:

from pyensign.ensign import Ensign

client = Ensign()
event = Event(b'{"temp": 72, "units": "fahrenheit"}', "application/json")
await client.publish("weather", event)

You can also specify an alternative mimetype for the byte encoding. For example, pickle is a common serialization format that's an alternative to JSON.

@publish("weather", mimetype="application/python-pickle")

Similarly you can use @subscriber to mark a subscriber which processes the weather data directly from the topic, e.g. to serve up weather updates in real time.

import json
from pyensign.ensign import 

@authenticate()
@subscriber("weather")
async def process_weather(events):
    for event in events:
        update = json.loads(event.data)
        print(update)
        await event.ack()

await process_weather()

This is equivalent to:

from pyensign.ensign import Ensign

client = Ensign()
async for event in client.subscribe():
    update = json.loads(event.data)
    print(update)
    await event.ack()

@authenticate should be specified at least once, usually on your main function or at the entry point of your application. By default it uses credentials from your environment, but you can also specify them directly or load them from a JSON file.

@authenticate(client_id="my-client-id", client_secret="my-client_secret")

@authenticate(cred_path="my-project-credentials.json")

Design patterns

Most event-driven applications require some form of concurrency. Therefore, the Ensign class is designed to be used asynchronously by defining coroutines. You can use Python's builtin asyncio package to schedule and run coroutines from the main thread.

import asyncio
from pyensign.ensign import Ensign

async def subscriber(topic):
    ...

    async for event in client.subscribe(topic):
        # Handle the event

def main():
    asyncio.run(subscriber(topic))

If you aren't comfortable with asyncio or need a more object-oriented interface, you can use the Publisher and Subscriber classes to implement your own publisher and subscriber apps.

import time
from pyensign.events import Event
from pyensign.publisher import Publisher

class MyPublisher(Publisher):
    def source_events(self):
        while True:
            # Call an API and yield some events!
            data = self.fetch_data()
            yield Event(data=data, mimetype="application/json")
            time.sleep(60)

    def run_forever(self):
        self.run(self.source_events())

publisher = MyPublisher("my-topic")
publisher.run_forever()
from pyensign.subscriber import Subscriber

class MySubscriber(Subscriber):
    async def on_event(self, event):
        # Process the event
        ...

        # Ack the event back to Ensign
        event.ack()

subscriber = MySubscriber("my-topic")
subscriber.run()

Contributing to PyEnsign

Wow, you want to contribute to PyEnsign? ๐Ÿ˜ We would absolutely love that!

PyEnsign is an open source project that is supported by a community who will gratefully and humbly accept any contributions you might make to the project. Large or small, any contribution makes a big difference; and if you've never contributed to an open source project before, we hope you will start with PyEnsign!

Please check out our Contributor's Guide in CONTRIBUTING.md to get a quick orientation first.

We can't wait to hear from you!

pyensign's People

Contributors

bbengfort avatar dv04 avatar pdeziel avatar rebeccabilbro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pyensign's Issues

Support for "traditional" mimetype strings

Mimetypes for events must currently be specified as the protobuf int or corresponding string (e.g. APPLICATION_JSON). This issue is to support traditional formatting for mimetypes (e.g. application/json).

There is a reference implementation in the Go SDK.

A few assorted API/version issues

Describe the bug

@pdeziel I received a report about a few issues a user is experiencing:

I am getting the following error:

"pyensign.exceptions.EnsignRPCError: received gRPC error from Ensign: UNAVAILABLE: Getting metadata from plugin failed with error: cannot unpack non-iterable SSLCertVerificationError object ((14, 'unavailable'))"

I think it is related to the updated Ensign library that we discussed the other day, but I am using the version provided in the rotational's github repositories.

Additionally, I am getting an error in my Pycharm when importing

from pyensign.api.v1beta1.ensign_pb2 import Nack
Cannot find reference 'Nack' in 'ensign_pb2.py'

details can be found here:
Repo
Issue
Requirements

Blocking RPCs

Ensign RPCs in pyensign are asynchronous and must be scheduled in the event loop. This should probably be the default, but it would be nice to have a set of RPCs that support synchronous applications as a user option on the Ensign client.

Subscribe acks and nacks

The subscribe RPC currently just yields the events for the user. However the gRPC stream is actually bidirectional and allows the client to send acks and nacks back to the server as defined in the protocol buffers.

The default functionality should be to send acks back after each event is received without user intervention.

Topic cache

Ensign.topic_id() gets the ID of a topic by name, which currently requires an RPC call. Topic names can't be changed, so this is something that can be cached on the client instead of making repeated requests.

DataFrame mimetype

Describe the solution you'd like
Can we add a specific mimetype and serialization method for pandas DataFrames? Currently we have to use JSON or pickle.

Refresh credentials

When pyensign authenticates with the auth server it receives an access token and a refresh token. The access token is used on each gRPC call to make authenticated requests to Ensign. The refresh token is currently unused, but it exists to allow the client to re-authenticate with the auth server once the access token has expired (but before the refresh token has expired).

So the ask for this issue is to implement the refresh logic in the AuthClient. This can initially be done on-demand, but eventually we may want to refresh in the background to try to prevent RPCs from blocking.

The Go SDK currently does this, so it may be a good reference point for implementing this feature.

Create Nack code constants

Describe the solution you'd like

Add the list of Nack code constants to the pyensign package, which are currently defined here: https://github.com/rotationalio/ensign/blob/973d70557afab8101aaaf5e736a47e590502107a/proto/api/v1beta1/ensign.proto#L128-L134

The constants should be in a place where they can be directly imported, e.g. in events.py.

This should be similar to how mimetypes are currently defined in mimetypes.py.

Is your feature request related to a problem? Please describe.

Nack codes are currently defined in the protocol buffers. Having to specify Nack codes by the integer or by importing the generating protocol buffers is not a good developer experience, especially in Python. Creating these aliases allows the user to see what codes are available without having to cross reference with another repo.

Type annotations

Describe the solution you'd like
In ensign.py we would like to use type annotations in the function definitions so that it's clear what expected types the parameters should have.

Quantify test coverage

Describe the issue
We would like to quantify how much test coverage we have right now and specifically which areas need more tests. I'm open to any coverage tool as long as it generates a useful report.

Destroy topic does not respect topic names

Describe the bug
Currently, the destroy_topic function in ensign.py only has the ability to destroy a topic if a user passes in a topic_id. It should be flexible enough to be able to destroy the topic by name as well.

To Reproduce

import asyncio
from pyensign.ensign import Ensign

async def destroy():
    ensign = Ensign()
    await ensign.destroy_topic("topic")

asyncio.run(destroy())

Dataset
Did you use a specific dataset to produce the bug? Where can we access it?

Expected behavior
I expected the topic to be destroyed, instead I got an error

Traceback

pyensign.exceptions.EnsignInvalidArgument: received gRPC error from Ensign: INVALID_ARGUMENT: invalid id field ((3, 'invalid argument'))

Desktop (please complete the following information):

  • OS: macOS
  • Python Version [e.g. 2.7, 3.10, miniconda]
  • PyEnsign Version [e.g. 1.1]

Additional context
Add any other context about the problem here.

Add examples directory

Describe the solution you'd like
We would like a top level directory to be added to the repo called examples which contains small code samples for using the PyEnsign API.

As a start, create a publisher.py script that publishes an event using PyEnsign. It doesn't really matter what the event is, we just want to make sure the example is codified and can be executed as a Python script.

Note: Credentials should be loaded from the environment instead of specified in the code itself.

Add more detailed steps to create a Project and access the API Key in the docs

Hi, I recently watched the recent YouTube videos for Creating a Project, Topics, and accessing the API Key and would like to request if I can add the details that were in those videos to create the written step-by-step process in either the Quickstart for the website or documentation as part of the Hacktoberfest? Some of the details that were in the video don't seem to appear in the Quickstart of the website and would like to help add those details it in the docs or website if possible. Thank you!

Support generators on publish

Ensign.publish() accepts an iterable of events. However we could also allow the user to pass in a generator which yields the events.

For instance:

def event_source():
    ...
    yield event

ensign = Ensign()
await ensign.publish(event_source())

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.