Code Monkey home page Code Monkey logo

nats.py2's Introduction

NATS - Tornado based Python 2 Client

A Python async client for the NATS messaging system.

License Apache 2.0 Build Status pypi

Supported platforms

Should be compatible with Python 2.7 using Tornado 4.2+ (less than 6.0).

For Python 3, check nats.py

Getting Started

pip install nats-client

Basic Usage

import tornado.ioloop
import tornado.gen
import time
from nats.io.client import Client as NATS

@tornado.gen.coroutine
def main():
    nc = NATS()

    # Establish connection to the server.
    yield nc.connect("nats://demo.nats.io:4222")

    @tornado.gen.coroutine
    def message_handler(msg):
        subject = msg.subject
        data = msg.data
        print("[Received on '{}'] : {}".format(subject, data.decode()))

    # Simple async subscriber
    sid = yield nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    yield nc.auto_unsubscribe(sid, 2)
    yield nc.publish("foo", b'Hello')
    yield nc.publish("foo", b'World')
    yield nc.publish("foo", b'!!!!!')

    # Request/Response
    @tornado.gen.coroutine
    def help_request_handler(msg):
        print("[Received on '{}']: {}".format(msg.subject, msg.data))
        yield nc.publish(msg.reply, "OK, I can help!")

    # Susbcription using distributed queue named 'workers'
    sid = yield nc.subscribe("help", "workers", help_request_handler)

    try:
        # Send a request and expect a single response
        # and trigger timeout if not faster than 200 ms.
        msg = yield nc.request("help", b"Hi, need help!", timeout=0.2)
        print("[Response]: %s" % msg.data)
    except tornado.gen.TimeoutError:
        print("Timeout!")

    # Remove interest in subscription.
    yield nc.unsubscribe(sid)

    # Terminate connection to NATS.
    yield nc.close()

if __name__ == '__main__':
    tornado.ioloop.IOLoop.current().run_sync(main)

Clustered Usage

import tornado.ioloop
import tornado.gen
from datetime import timedelta
from nats.io import Client as NATS
from nats.io.errors import ErrConnectionClosed

@tornado.gen.coroutine
def main():
    nc = NATS()

    # Set pool servers in the cluster and give a name to the client
    # each with its own auth credentials.
    options = {
        "servers": [
            "nats://secret1:[email protected]:4222",
            "nats://secret2:[email protected]:4223",
            "nats://secret3:[email protected]:4224"
            ]
        }

    # Error callback takes the error type as param.
    def error_cb(e):
        print("Error! ", e)

    def close_cb():
        print("Connection was closed!")

    def disconnected_cb():
        print("Disconnected!")

    def reconnected_cb():
        print("Reconnected!")

    # Set callback to be dispatched whenever we get
    # protocol error message from the server.
    options["error_cb"] = error_cb

    # Called when we are not connected anymore to the NATS cluster.
    options["closed_cb"] = close_cb

    # Called whenever we become disconnected from a NATS server.
    options["disconnected_cb"] = disconnected_cb

    # Called when we connect to a node in the NATS cluster again.
    options["reconnected_cb"] = reconnected_cb

    yield nc.connect(**options)

    @tornado.gen.coroutine
    def subscriber(msg):
        yield nc.publish("pong", "pong:{0}".format(msg.data))

    yield nc.subscribe("ping", "", subscriber)

    for i in range(0, 100):
        yield nc.publish("ping", "ping:{0}".format(i))
        yield tornado.gen.sleep(0.1)

    yield nc.close()

    try:
        yield nc.publish("ping", "ping")
    except ErrConnectionClosed:
        print("No longer connected to NATS cluster.")

if __name__ == '__main__':
    tornado.ioloop.IOLoop.current().run_sync(main)

Wildcard Subscriptions

import tornado.ioloop
import tornado.gen
import time
from nats.io import Client as NATS

@tornado.gen.coroutine
def main():
    nc = NATS()

    yield nc.connect("demo.nats.io")

    @tornado.gen.coroutine
    def subscriber(msg):
        print("Msg received on [{0}]: {1}".format(msg.subject, msg.data))

    yield nc.subscribe("foo.*.baz", "", subscriber)
    yield nc.subscribe("foo.bar.*", "", subscriber)
    yield nc.subscribe("foo.>", "", subscriber)
    yield nc.subscribe(">", "", subscriber)

    # Matches all of above
    yield nc.publish("foo.bar.baz", b"Hello World")
    yield tornado.gen.sleep(1)

if __name__ == '__main__':
    tornado.ioloop.IOLoop.current().run_sync(main)

Advanced Usage

import tornado.ioloop
import tornado.gen
from nats.io import Client as NATS
from nats.io.errors import ErrNoServers

@tornado.gen.coroutine
def main():
    nc = NATS()

    try:
        # Setting explicit list of servers in a cluster and
        # max reconnect retries.
        servers = [
            "nats://127.0.0.1:4222",
            "nats://127.0.0.1:4223",
            "nats://127.0.0.1:4224"
            ]
        yield nc.connect(max_reconnect_attempts=2, servers=servers)
    except ErrNoServers:
        print("No servers available!")
        return

    @tornado.gen.coroutine
    def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        for i in range(0, 20):
            yield nc.publish(reply, "i={i}".format(i=i).encode())

    yield nc.subscribe("help.>", cb=message_handler)

    @tornado.gen.coroutine
    def request_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Signal the server to stop sending messages after we got 10 already.
    yield nc.request(
        "help.please", b'help', expected=10, cb=request_handler)

    # Flush connection to server, returns when all messages have been processed.
    # It raises a timeout if roundtrip takes longer than 1 second.
    yield nc.flush()

    # Drain gracefully closes the connection, allowing all subscribers to
    # handle any pending messages inflight that the server may have sent.
    yield nc.drain()

    # Drain works async in the background.
    yield tornado.gen.sleep(1)

if __name__ == '__main__':
    tornado.ioloop.IOLoop.instance().run_sync(main)

TLS

Advanced customizations options for setting up a secure connection can be done by including them on connect:

# Establish secure connection to the server, tls options parameterize
# the wrap_socket available from ssl python package.
options = {
    "servers": ["nats://127.0.0.1:4444"],
    "tls": {
        "cert_reqs": ssl.CERT_REQUIRED,
        "ca_certs": "./configs/certs/ca.pem",
        "keyfile":  "./configs/certs/client-key.pem",
        "certfile": "./configs/certs/client-cert.pem"
      }
    }
yield nc.connect(**options)

The client will also automatically create a TLS context with defaults in case it detects that it should connect securely against the server:

yield nc.connect("tls://demo.nats.io:4443")

Examples

In this repo there are also included a couple of simple utilities for subscribing and publishing messages to NATS:

    # Make a subscription to 'hello'
    $ python examples/nats-sub hello

    Subscribed to 'hello'
    [Received: hello] world

    # Send a message to hello
    $ python examples/nats-pub hello -d "world"

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

nats.py2's People

Contributors

ajoubertza avatar aleksmaus avatar brianshannan avatar brianshannan-wf avatar charliestrawn avatar charliestrawn-wf avatar colinsullivan1 avatar fjoubert avatar gcolliso avatar scunningham avatar shadiakiki1986 avatar theisensanders-wf avatar wallyqs avatar zhuoqiang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats.py2's Issues

can not use auth_token for authorization

if API Key is used for authorization, for example: nats://<some-api-key>@foo.bar.com:123456

client does not set auth_token properly which cause exception:

NatsError: nats: 'Authorization Violation'

BTW: asyncio-nats library support API key however it does not support python3.

The examples in the documentation are using deprecated functions for tornado version 5.0+

tornado.ioloop.IOLoop.instance().run_sync(main) should be using IOLoop.current() instead based on the following documentation from tornado version 5.0.1.

 |  instance()
 |      Deprecated alias for `IOLoop.current()`.
 |
 |      .. versionchanged:: 5.0
 |
 |         Previously, this method returned a global singleton
 |         `IOLoop`, in contrast with the per-thread `IOLoop` returned
 |         by `current()`. In nearly all cases the two were the same
 |         (when they differed, it was generally used from non-Tornado
 |         threads to communicate back to the main thread's `IOLoop`).
 |         This distinction is not present in `asyncio`, so in order
 |         to facilitate integration with that package `instance()`
 |         was changed to be an alias to `current()`. Applications
 |         using the cross-thread communications aspect of
 |         `instance()` should instead set their own global variable
 |         to point to the `IOLoop` they want to use.
 |
 |      .. deprecated:: 5.0
 |

Coroutine mechanism when pub in "sub callback"

I have a NATS client that subscribe to a subject S1 with queue and do, in callback, a few processes (in a loop) and for each iteration of this loop pub to another subject S2.
CASE 1 - The problem is that even all pub are processed, they are "really" published in the server only after the end of the subscription callback (the API tells me the pub is done but when I'm seeing the monitoring in port 8222 it's not, it is waitting). Or I need the pub to be processed immediately .

CASE 2 - The only way I found, to make it, is to call client.flush() or tornado.gen.sleep() (I think it is relative to the add_timeout() that is called internally in those two methods).
The problem of this way is that if I publish 2 message in my queue (subject S1) the second message is "consumed" (means the callback is executed) after the first call of client.flush() or gen.sleep(). And each iteration of the loop, for the two callbacks, are executed alternately. Here is an example of code that simulate what I say:


import tornado.ioloop
import tornado.gen
import datetime
from nats.io.client import Client as NATS

@tornado.gen.coroutine
def main():
    nc = NATS()
    servers = ["nats://127.0.0.1:4222"]

    opts = { "servers": servers }
    yield nc.connect(**opts)
    """
    the S1 subscriber
    """
    @tornado.gen.coroutine
    def sub_pub_handler(msg):
        def wait(seconds):
            current_time = int(datetime.datetime.utcnow().strftime("%s"))
            wait = True
            while(wait):
                if((int(datetime.datetime.utcnow().strftime("%s")) - current_time) > seconds):
                    wait = False
            return
        data = msg.data
        print "Sub2 receive [%s]" % data
        x = 0
        while x < 10 :
            to_send = "%s-%d" % (msg.data,x)
            print to_send
            yield nc.publish("S2", to_send)
            """
            Uncomment to activate CASE 2
            """
            #yield nc.flush()
            x += 1
        wait(10)
        print "Done"
    print "Subscribed to S1"
    yield nc.subscribe("S1", "sub_pub_1", sub_pub_handler, is_async=True)
    """
    the S2 subscriber
    """
    @tornado.gen.coroutine
    def sub_handler(msg):
        print "Sub2 receive"
        print msg.data
    print "Subscribed to S2"
    yield nc.subscribe("S2", "", sub_handler, is_async=True)
    """
    publisher
    """
    print "Publish 'momo' to S1"
    yield nc.publish("S1", "momo")
    print "Publish 'toto' to S1"
    yield nc.publish("S1", "toto")
    yield tornado.gen.sleep(100)

if __name__ == "__main__":
    tornado.ioloop.IOLoop.instance().run_sync(main)

Note: I use the wait() function in the sub_pub_handler(msg) (handler of S1 subscription) to avoid simulate a long process without using tornado.gen.sleep()

Can we get a build?

The hard dependency on tornado 4.2 in v0.5.4 is problematic. Can we get a new tag and a build pushed up to pypi.python.org? Thanks.

ImportError: No module named io

Traceback (most recent call last):
File "/nats.py", line 6, in
import nats.io
File "\nats.py", line 6, in
import nats.io
ImportError: No module named io

windows 7, python 2.7.13 (both x32, x64)
nats installed by "pip install nats-client"

There was a problem with coexistence with other python nats client

Connection fails if tls_required field missing from INFO block

After installing v1.2.0 of gnatsd (linux-amd64) the client connection fails with an exception. Not sure if this is a defect in gnatsd, or if the client needs to be more lenient?

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/my_package/my_module.py", line 123, in _setup_nats
    yield self.nats.connect(**nats_options)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/dist-packages/nats/io/client.py", line 240, in connect
    yield self._process_connect_init()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/usr/local/lib/python2.7/dist-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/usr/local/lib/python2.7/dist-packages/nats/io/client.py", line 821, in _process_connect_init
    if self._server_info['tls_required']:
KeyError: 'tls_required'

gnatsd v1.1.0: includes tls_required field in INFO
(and auth_required and tls_verify)

$ telnet localhost 4222
Trying 127.0.0.1...
Connected to localhost.localdomain.
Escape character is '^]'.
INFO {"server_id":"aBCDEtuGn9kb3qbAHGDD23","version":"1.1.0","git_commit":"","go":"go1.9.4","host":"0.0.0.0","port":4222,"auth_required":false,"tls_required":false,"tls_verify":false,"max_payload":1048576} 

gnatsd v1.2.0: does not report tls_required field in INFO

$ telnet localhost 4222
Trying 127.0.0.1...
Connected to localhost.localdomain.
Escape character is '^]'.
INFO {"server_id":"tysypEmM3cABCDN06exi8e","version":"1.2.0","proto":1,"go":"go1.10.3","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":107} 

import nats fails after pip install nats-client

After installing with pip install nats-client, I am getting the following error when I try to import.

Python 2.7.12 (default, Dec  4 2017, 14:50:18)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import nats
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "nats.py", line 6, in <module>
from nats.io.utils import new_inbox
ImportError: No module named io.utils

This is weird because it was importing correctly before.

I also checked the nats package that pip installed and there is a nats.io.utils file so maybe there is something going on with the packaging.

Stream is closed

Hi,

My use case :

  • Message average size : 565Kb
  • After each 10 messages sent, I call tornado.gen.sleep(0), so in the beginning it publish message 10 by 10.
    But, after a few publications, my application doesn't publish anymore and I get a StreamClosedError.

I've the following stacktrace :

Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/tornado/ioloop.py", line 600, in _run_callback
  ret = callback()
File "/usr/local/lib/python2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
  return fn(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/tornado/ioloop.py", line 615, in <lambda>
  self.add_future(ret, lambda f: f.result())
File "/usr/local/lib/python2.7/site-packages/tornado/concurrent.py", line 232, in result
  raise_exc_info(self._exc_info)
File "/usr/local/lib/python2.7/site-packages/tornado/gen.py", line 282, in wrapper

  yielded = next(result)
File "/usr/src/app/src/nats-client/nats/io/client.py", line 243, in _send_ping
  yield self.io.write(PING_PROTO)
File "/usr/local/lib/python2.7/site-packages/tornado/iostream.py", line 377, in write
  self._check_closed()
File "/usr/local/lib/python2.7/site-packages/tornado/iostream.py", line 885, in _check_closed
  raise StreamClosedError(real_error=self.error)
StreamClosedError: Stream is closed.
  • After getting this error, my application publish the rest of the message, not 10 by 10 but all messages (with some lost messages).

I want to know if it is an issue according to a nats config, or something else...?

Regards,

Able to create connection,publish,subscribe without passing ca pem file to a nats core tls enabled server

Using nats.py2 able to create connection, publish, subscribe without passing ca pem file to a nats core tls enabled server running behind nats streaming server, whereas other clients in other language required ca.pem, gave error : Can't connect: x509: certificate signed by unknown authority

Nats-streaming server configuration: nats_streaming_config.txt

File :
nats_pub_sub.txt

The TLS handshake should fail as the server's certificate is not found in client's truststore?

client never reconnects

I'm hitting a bug where the NATS client never reconnects, im trying to publish something but if the connection reset for the first publish, the status of the connection wont reflect that, however after a second attemp the nc._status changes to 3 "RECONNECTING" and a call to _unbind() happens, there when the reconnection is happening this Task is never resuming, I assume because the IOLoop is stopped.

Inside _schedule_primary_and_connect() execution never passes beyond this snippet

      yield tornado.gen.Task(
        self._loop.add_timeout,
        timedelta(seconds=RECONNECT_TIME_WAIT))

The way I'm using python-nats is by storing the connection on a global for re-use, the calling my notify function as a tornado.gen.coroutine, this works fine until I reset NATS on purpose to check wether the connect is re-stablished.

def notify(who, what):
    try:
        tornado.ioloop.IOLoop.instance().run_sync(lambda: publish_notification(who=who, what=what))
    except Exception as e:
        print "DEBUG: Failed to notify: {}".format(e)
        traceback.print_exc()


@tornado.gen.coroutine
def publish_notification(who, what):
    if 'nats' not in GLOBAL_NATS:
        nc = NATS()
        GLOBAL_NATS['nats'] = nc
        # Establish connection to the server.
        options = {"verbose": True, "servers": ["nats://nats:4222"]}
        options['allow_reconnect'] = True
        yield nc.connect(**options)
    else:
        nc = GLOBAL_NATS['nats']
    yield nc.publish(who, what)

My theory is that the loop stops.

Steps to replicate:

1.- open nats connection
2.- publish something to see connection works
3.- reset nats (I run nats from the docker container)
4.- publish another message (nc._status == 1)
5.- publish another message (nc._status == 3)

Can't pip install nats-client

When trying to pip install nats-client I receive the following error:

$ pip install nats-client
Collecting nats-client
  Using cached nats_client-0.2.0.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/private/var/folders/lk/ymzwyyv92z11zmxqlct6phq80000gp/T/pip-build-Xwfa7s/nats-client/setup.py", line 2, in <module>
        from nats.io.client import __version__
      File "nats/__init__.py", line 1, in <module>
        from nats.io.client import Client
      File "nats/io/client.py", line 7, in <module>
        import tornado.iostream
    ImportError: No module named tornado.iostream

    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/lk/ymzwyyv92z11zmxqlct6phq80000gp/T/pip-build-Xwfa7s/nats-client

Steps to reproduce:

  1. create a new virtualenv
  2. pip install nats-client

As a consumer of this library, I would expect to be able to list nats-client in my requirements.txt file and be able to pip install it from there. I'm happy to help out debugging anyway I can.

Thanks,
Charlie

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.