Code Monkey home page Code Monkey logo

puavro's Introduction

puavro

The puavro is a small convenience library enabling usage of the Apache Pulsar Python client with pre-defined AVRO schemas and Python dictionaries instead of AVRO schemas declared as records.

In other words, the library provides an interface to the standard Apache Pulsar client allowing to read/write AVRO messages from/to Python dictionary using AVRO schema, either:

The puavro library consists of just two classes:

  • DictAVRO derived from Python dict and designated to be used instead of pulsar.schema.Record class;
  • DictAvroSchema derived from pulsar.schema.AvroSchema and designated to be used instead.

See also:

Motivation

To enable usage of Python Pulsar client with AVRO messages generated / received by modules written in other languages and using external AVRO schemas (e.g. stored in .avsc files).

Installing

puavro is available on PyPi:

pip install puavro

Dependencies

The library depends on the following modules:

fastavro>=1.4.4
pulsar-client>=2.7.0

Compatibility

The library has been run and tested against Pulsar Python client v. 2.7.0 and 2.8.0. and fastavro v. 1.4.4. and is expected to be compatible with all higher versions also.

License

The library is provided under terms of the MIT license.

How to use

The samples in this sections assume the following imports:

import pulsar
import fastavro
import puavro

import json
import datetime
from pprint import pp

Defining dictionary for AVRO schema

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.load_schema("Segment.avsc")

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema(json.loads("""{
  "type" : "record",
  "name" : "Segment",
  "namespace" : "try",
  "fields" : [ {
    "name" : "id",
    "type" : "long"
  }, {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "when",
    "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
    "name" : "direction",
    "type" : {
      "type" : "enum",
      "name" : "CardinalDirection",
      "symbols" : [ "north", "south", "east", "west" ]
    }
  }, {
    "name" : "length",
    "type" : [ "null", "long" ]
  } ]
}
"""))

or

class Segment(puavro.DictAVRO):
    SCHEMA = fastavro.schema.parse_schema({
        "type" : "record",
        "name" : "Segment",
        "namespace" : "try",
        "fields" : [ {
            "name" : "id",
            "type" : "long"
        }, {
            "name" : "name",
            "type" : "string"
        }, {
            "name" : "when",
            "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
            }
        }, {
            "name" : "direction",
            "type" : {
            "type" : "enum",
            "name" : "CardinalDirection",
            "symbols" : [ "north", "south", "east", "west" ]
            }
        }, {
            "name" : "length",
            "type" : [ "null", "long" ]
        } ]
        })

or

class Segment(puavro.DictAVRO):
    pass

Segment.set_schema(fastavro.schema.load_schema("segment.avsc"))

Producer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
producer = pulsar_client.create_producer(topic=TOPIC, 
                                         schema=puavro.DictAvroSchema(Segment))
try:
    segment = Segment(
        id=99,
        name = "some name",
        when = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc),
        direction = "north",
        length = 12345,
    )
    producer.send(segment)
finally:
    producer.close()
    pulsar_client.close()

Consumer

Using class Segment (derived from puavro.DictAVRO above) and puavro.DictAvroSchema class (instead of pulsar.schema.AvroSchema):

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
WAIT_SECONDS = 3

pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
consumer = pulsar_client.subscribe(TOPIC, 
                                   subscription_name="sample", 
                                   consumer_type=pulsar.ConsumerType.Shared,
                                   schema=puavro.DictAvroSchema(Segment))
try:
    while True:
        msg = consumer.receive(WAIT_SECONDS * 1000)
        segment = msg.value()

        pp(segment)

        consumer.acknowledge(msg)
except Exception as e:
    if str(e) == 'Pulsar error: TimeOut':
        print("END OF DATA")
    else:
        raise
finally:
    consumer.close()
    pulsar_client.close()

Samples

The complete samples can be found in the samples directory:

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.