basis-company / nats.php Goto Github PK
View Code? Open in Web Editor NEWnats jetstream client for php
nats jetstream client for php
First off, thanks for working on a php client for nats. Its very helpful and appreciated.
While using your library I have noticed that when I am trying to use publish() if there is an issue connecting to the nats server, I am not seeing exceptions thrown to indicate that something has gone amiss. Looking at the code in Client it seems like processSocketException() gets called in a loop without some kind of break, and all exceptions get swallowed up and continued. The best I have found so far is to call ping() before I do a publish() to see if that works.
Do you have any better suggestions? The built-in retry is helpful, but at some point (say after a few seconds), I want to know that it has failed and be able to log it.
Hi guys
I'm wandering if there any way to close previously opened connection.
Would it be useful to have a client method $client->disconnect() to unsubscribe from all subscriptions and close socket?
Hello, I keep getting this exception no matter what I do. I'm just trying to ping the server as in the examples. Please help, what am I missing?
There are a number of issues with the value this lib uses as the reply-to value in request and dispatch functions. I want to discuss potential solutions with you before making any code changes as there are multiple ways to address the issues.
Here are the issues:
Here are the potential changes that I want to make. Let me know which one you prefer or if you can think of an alternative:
$sid = '_INBOX.' . bin2hex(random_bytes(16));
. The problem with this approach is that it could break existing implementations where the cluster permissions are based on the current value inbox.>
.$configuration = new Configuration([
'host' => 'localhost',
'port' => 4222,
'replyToGenerator' => function () {
return '_INBOX.' . Cuid::cuid();
},
]);
For streams we have a publish message whose signature is like
publish(string $subject, mixed $payload)
so usually we send anything as payload, if I make a NATS message and publish it like
$msg = new Msg();
$msg->subject = $subject;
$msg->payload = new Payload($headers, $data);
$stream->publish($subject, $msg);
// Or could we do something like
$client->publish($msg);
is this the correct appproach?
Hi!
What's the best way to keep a processing listening/wait for pubsub messages to come in?
It looks like the following is The Way™ to go about this - do I need to put $client->process()
in a loop and set its timeout?
$client = new \Basis\Nats\Client;
$client->subscribe('some-topic', function ($message) {
printf("Data: %s\r\n", $message->getBody());
});
$client->process();
Thanks!
import "github.com/nats-io/nats.go"
// Connect to NATS
nc, _ := nats.Connect(nats.DefaultURL)
// Create JetStream Context
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256))
// Simple Stream Publisher
js.Publish("ORDERS.scratch", []byte("hello"))
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
js.PublishAsync("ORDERS.scratch", []byte("hello"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
Am trying to do same thing in PHP. PHP api allows ->getStream() by its name, my lead sad that nats can automatically resolve stream name by subject. Could you please provide me with some solution in PHP?
Guess, in defaultURL could be stream name and lead guy just dont mention that?
In Gearman, you can do while ($worker->work())
. Is it possible to have this approach, if applicable? Instead of doing...
while (true) {
$client->process();
}
Though i stumbled $client->process(30 * 60)
and $configuration->setDelay(0.001)
I have no idea if it achieve similar to Gearman approach.
Hello!
i have a question
Which correct way to filter subject while i create consumer ?
Method setSubjectFilter() at \Basis\Nats\Consumer\Configuration
class does not produce any result.
Here is my code example:
/**
* @throws \Throwable
*/
public function __invoke(): void
{
$myStreamName = 'some_stream';
$stream = $this->client->getApi()->getStream($myStreamName);
$configuration = (new ConsumerConfiguration($myStreamName))
->setDeliverPolicy(DeliverPolicy::NEW)
->setSubjectFilter('some_subject');
$ephemeralConsumer = $stream->createEphemeralConsumer($configuration);
$ephemeralConsumer->handle(function ($some) {
dump($some);
});
}
In this example i expect that API request to NATS will look like $JS.API.CONSUMER.CREATE.some_stream.some_subject
, but i get API request to $JS.API.CONSUMER.CREATE.some_stream
only, and as a result i recieve an authorization error
after #68, the client logic is still off it seems
14:13:40 DEBUG [messenger] send HPUB test _INBOX.7748dd3c7af163d014b9648cd8389412 91 351
14:13:42 DEBUG [messenger] receive PING
14:13:42 DEBUG [messenger] send PONG
14:13:42 DEBUG [messenger] sleep
14:13:45 DEBUG [messenger] send PING
14:13:45 ERROR [messenger] Socket read timeout
im not able to reproduce it locally
but generally i wonder why there's a pong side effect
Objective i want to read items from jetstream and process them one by one using my handler.
Solution
Somewhere in my app i have this
$this->client->subscribe("mystream", "mysubject", function (array|null $headers, mixed $body, string $subject){
echo $body;
return true;
});
and this is the detail of the subscriber function.
public function subscribe(string $name, string $subject, callable $listener): void
{
$stream = $this->client->getApi()->getStream($name);
$consumer = $stream->getConsumer("myConsumer");
$consumer->getConfiguration()->setSubjectFilter($name.$subject);
if(!$consumer->exists()) {
$cosumer->create();
}
$queue = $consumer->getQueue();
while (true) {
try {
$msgs = $queue->fetchAll(10);
foreach ($msgs as $msg) {
if ($msg !== null) {
$headers = $msg->payload->headers ?? array();
$body = $msg->payload->body;
$subject = $msg->subject;
$ret = $listener($headers, $body, $subject);
if ($ret) {
$msg->ack();
}
}
}
} catch (\Exception $e) {
echo $e->getMessage();
}
}
}
on NATS i have been able to publish messaages. this is what i get from nats-cli from mystream.mysubject
bbdfca7cb16a:~# nats stream view
[server] ? Select a Stream mystream
[1] Subject: mysubject Received: 2024-06-30T16:05:56Z
hello world[2] Subject: mysubject Received: 2024-06-30T16:20:25Z
hello world[3] Subject: mysubject Received: 2024-06-30T19:09:33Z
How are you[4] Subject: mysubject Received: 2024-06-30T19:13:26Z
How are you[5] Subject: mysubject Received: 2024-06-30T19:13:30Z
Are you there
but fetchall(10) always return an empty array. Also note that I'm running it thru a php script.
What is my mistake in setting up the consumer?
Fatal error: Constant expression contains invalid operations in /var/www/html/nats-jet/src/Client.php on line 38
public function __construct(
public Configuration $configuration = new Configuration(),
public ?LoggerInterface $logger = null
) {
$this->api = new Api($this);
}
now is _INBOX.6c93540674db70dd9802fe1c516e3ba1
In case of slow consumers NATS stops to write data into socket. Moreover, NATS can stop writing in the middle of the message.
In this case Client::process method is trying to read message 16 times and then throws exception or reconnects to NATS.
When reconnect is performed Client::process method is trying to process partly read message. This behaviour will lead to deserialization errors if message payload is serialized somehow or even errors that are difficult to detect if payload is not serialized (e.g comma-separated list of values)
Also before 9230e67 this behaviour of the library was a reason for "No handler for message xxxx" error after client reconnected. Now this error does not appear but processing of partly read messages is pretty strange.
I prepared PR to fix it but I can't add tests for changes as two processes are required to get "Slow consumer". Also "No handler for message xxxx" error is not triggered now so I decided to close my PR. Looks like partial payload can be handled by application itself but I'm not sure it is good way. Looks really strange that library gives you partly read message. I can restore my PR if you agree to merge it or help me to update it somehow to avoid this issue.
When using this library and trying to put an item into a KV, if the KV does not exist, this library will automatically create a KV store.
Example:
$client->getApi()->getBucket('my-bucket')->put($key, $value);
This will create a bucket that appears to be incompatible with other libraries (e.g. the Go SDK for NATS). The only solution is to remove the KV store and allow the Go SDK to create it.
related to #68
now we see endless pinging afer publish every now&then
16:15:44 DEBUG [messenger] send CONNECT
16:15:44 DEBUG [messenger] send SUB handler.b6de490c bf07f53d
16:15:44 DEBUG [messenger] send PUB $JS.API.CONSUMER.MSG.NEXT.test.test handler.b6de490c 26
{"batch":1,"no_wait":true}
16:15:47 DEBUG [messenger] sleep
16:15:52 DEBUG [messenger] sleep
16:15:57 DEBUG [messenger] send PING
16:15:57 DEBUG [messenger] sleep
16:16:02 DEBUG [messenger] sleep
16:16:07 DEBUG [messenger] sleep
16:16:12 DEBUG [messenger] send PING
The code calls strtoupper() for KeyValue, why?
NATS can have buckets in lowercase, as well as the keys in the buckets.
KeyValue/Configuration.php line 33
Bucket.php lines 101 and 115
Hi,
I would like to add new options to the connection and reconnection setup to make it work more like the node.js lib. On the node lib you can set the server connection param to a host:port string or array of host:port strings. If the array is used the client cycles through the array until it finds a server that it can connect to. There is also an parameter to shuffle the order of the host:port strings before attempting to connect. This should have the effect of load balancing connections.
While I'm at it I will look into connecting to an alternative server when the connected server goes into lame duck mode. https://docs.nats.io/running-a-nats-service/nats_admin/lame_duck_mode
I should be able to implement the above while leaving the current usage intact. I see that the node lib retained it's separate host and port params when it added the server param. I'm letting you know in case one of you is already working on this feature.
Regards
Are there any plans to make the project asynchronous using the library https://github.com/revoltphp/event-loop and Fiber?
Hi,
Sometimes we notice following behavior in workers:
12:23:47 DEBUG [nats] send SUB handler.11949d0c c894bf04
12:23:47 DEBUG [nats] send PUB $JS.API.CONSUMER.MSG.NEXT.test.test handler.11949d0c 26
{"batch":1,"no_wait":true}
12:23:49 DEBUG [nats] receive PING
12:23:49 DEBUG [nats] send PONG
12:23:54 DEBUG [nats] sleep
12:23:59 DEBUG [nats] sleep
12:24:08 DEBUG [nats] send PING
12:24:08 DEBUG [nats] sleep
12:24:13 DEBUG [nats] sleep
12:24:18 DEBUG [nats] sleep
12:24:23 DEBUG [nats] send PING
12:24:23 DEBUG [nats] sleep
which goes on endless.
Im trying to understand what's happening, and hoping for some clues here :)
When working on #62, I discovered there are some issues with implementing the consumer model as-is. Further, some "newer" features of NATs is not easily available (NAKs, ordered consumers, delayed messages, etc) so it would be nice to have them.
I took a deep look at the Go implementation to get an idea of what it might look like in PHP. Go tends to be written in such a way that things feel synchronous even though they aren't. So it has the best chance of being a compatible model for PHP (for both traditional PHP and async PHP via fibers).
Taking inspiration from that, there are a few low-level types that this depends on:
// send to NATs describing how we want to consume from the stream
readonly class PullRequest { public function __construct(
public int? $expiresSeconds,
public int? $batchCount,
public int? $maxBytes,
public bool $noWait,
public int? $idleHeartbeat
) {} }
readonly class Metadata { public function __construct(
public int $consumerSequence,
public int $streamSequence,
public int $numDelivered,
public int $numPending,
public DateTimeImmutable $timestamp,
public string $streamName,
public string $consumerName,
public string $domainName,
) {} }
interface ConsumerMessage {
public function getMetadata(): Metadata;
public function getRawData(): string;
public function getHeaders(): array;
public function getSubject(): string;
public function getReplySubject(): string;
// acknowledge the message
public function ack(): void;
// ack the message and wait for ack reply from the server. Useful for scenarios where the message loss
// is unacceptable, despite the performance impact.
public function doubleAck(): void;
// tell the server to redeliver the message
public function nak(): void;
// tell the server to redeliver the message after a delay
public function nakWithDelay(float $millisecondsDelay): void;
// tell the server the message is still being worked on. This resets the server redelivery timeout.
public function inProgress(): void;
// tell the server to never redeliver the message
public function term(): void;
// tell the server why the message shouldn't be delivered which will be emitted as a server advisory.
public function termWithReason(string $reason): void;
}
interface MessagesContext extends \Iterator {
// gets the next message from the stream via foreach or manually. Blocks until there is a message
public function next(): ConsumerMessage;
// unsubscribe from the stream and immediately stops iteration. Messages may still be in the
// inbox and will be discarded.
public function stop(): void;
// unsubscribe from the stream but any messages still in the buffers/inbox will continue to be
// consumed until they are gone.
public function drain(): void;
}
interface ConsumerContext {
// stops consumer and any pending messages will be discarded
public function stop(): void;
// stops the consumer but continues to consume pending messages
public function drain(): void;
}
interface MessageBatch {
// @return Generator<ConsumerMessage>
public function getMessages(): \Generator;
}
Here's the interface inspired by the Go consumer:
interface Consumer {
// use to receive up to a $batchCount of messages from a stream or $maxWait seconds pass,
// whichever is sooner. Note that $idleHeartbeat is 5s by default (for $maxWaits longer than 10s, or
// disabled for shorter waits) and if the client hasn't received a heartbeat in 2x $idleHeartbeat, then
// an exception should be thrown. This method is non-blocking, but returns a Messagebatch that can
// be iterated on.
public function fetch(int $batchCount, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;
// exactly the same as fetch(), but counts by bytes instead of the number of messages.
public function fetchBytes(int $maxBytes, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;
// exactly like fetch(), but if there are no messages available in the stream, then the generator
// will return immediately, regardless of the number of messages requested.
public function fetchNoWait(int $batchCount): MessageBatch;
// continuously consumes from a stream using the provided consumer function. The callback can
// accept up to two arguments: fn(ConsumerMsg $message, ConsumerContext $context)
public function consume(\Closure $consumer): ConsumerContext;
// Allow continuously iterating over a stream.
public function Messages(): MessagesContext;
// receive the next message from the stream. Note that this is a blocking call.
public function next(int $maxWait = 30, int $idleHeartbeat = 5): ConsumerMessage;
// get the current consumer configuration from the stream
public function info(): Configuration;
}
What are your thoughts?
Internally library has some brood code where it's working with nats socket.
There is something like "bind handler to each message"... this callbacks could kill memory limit of any web server...
Usually in any applications worker does ITS OWN while (true)
with configurable limits and timing.
Method (Stream)->handle() does that, but there's no possibility to "ask for some messages" without binding handler. Even if handler is one line with
$consumer->handle(function ($message) use (&$messages) { $messages[] = $message; });
its still a handler that needs memory to store the \Closure object.
I mean that IMPOSSIBLE to wrap while (true)
into own while (true)
thats why you need to options like "setBatching()" and "setDelay", its not NATS stuff (but supported of course for lazy people and to debug/demonstration) - its business and server configuration.
Actually any listening for better performance uses internally Observer pattern and works like
$eventBus = new EventBus();
$eventBus->on($subject1, $handler11);
$eventBus->on($subject1, $handler12);
$eventBus->on($subject2, $handler21);
$natsEventStore = new NatsEventStore($natsClient, $stream, $consumer);
$natsEventStore->listen($eventBus);
Instead of
$consumer->handle(function ($message) use ($eventBus) { $eventBus->fire($message->subject); });
Exactly because handle method with other settings could provide COLLECTION of messages and needs foreach then.
Also eventBus can parralel call handlers inside.
====
NATS is a fastest thing. Using many memory for processing - kills NATS bonuses.
I found this issue where trying to parralel connections with pcntl_fork(), garbage collector starts to delete so many callbacks from memory then i get error message "no handler for message", in case we CANNOT get message without a handler in this library.
Hi,
Can we watch KV bucket with this lib?
Thanks
BR
Tapan Thapa
On event based system we have two primary flows of work - events and commands.
Events usually works like a radio (online) or youtube (online + store) - who want to get, he get. Otherwise ignore - nothing changes.
Commands usually works like a queue (message) - at least one slave should do command he received from the master and if command failed - retry few times until retry timeout expires.
Commands NEED to "ack" message if done or if failed. But should "deny" if exception happened. How to "ack" message in your library? I guess it ack always or something? (JetStream for example has only method ->handle() that wants messageHandler/emptyHandler), but the payload is instance of Payload::class, that doesnt have method ->ack()...
And also AckPolicy::Explicit is set up by default. But... guess it ack message automatically (inside basis-company/nats library code) cus of autoask on NATS side requires other mode enabled.
Current implementation does not support ephemeral consumers for JetSream. They can be useful for irregular task when you need get some data from stream for metrics calculation and don't want to persist consumers in NATS. Ofcourse you can create durable consumers and then remove them in the end of the task but ephemeral consumers look like more suitable concept.
I create a PR with simple implementation but have some issue with checks.
Hope somebody will help me with checks and I'm open for change requests to this feature.
public function publish(string $name, mixed $payload, ?string $replyTo = null): self
{
return $this->send(new Publish([
'payload' => Payload::parse($payload),
'replyTo' => $replyTo,
'subject' => $name,
]));
}
public static function parse(mixed $data): self
{
if ($data instanceof self) {
return $data;
}
if (is_string($data)) {
return new self($data);
}
if (is_array($data)) {
return new self(json_encode($data));
}
return new self("");
}```
Causes nil bodies to be published to NATS if data type is not string or array
Took the keys from the examples
server:
authorization: {
users: [
{ nkey: UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4 }
]
}
client:
$c = new NatsClient(new Configuration([
'host' => 'nats',
'nkey' => 'SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY'
]));
$c->ping(); //Authorization Violation
I tried to generate keys, the problem persists
Hi, thank you for repo, you do very good thing..
https://github.com/basis-company/nats.php/blob/main/src/Message/Msg.php
I can not find Header in Msg class. How to send headers ?
Good afternoon.
When using nats version higher than 2.9.1, I get the error "Invalid property key for message Basis\Nats\Message\Info". There are no authorization settings or anything else on the nats cluster. I do not know how to solve this problem, with the default configuration of the connection to nats
Is there current a way to send messages with headers to a JetStream stream? I peaked around in the code but could not find it.
Thank you!
In our project we will have php producers/consumers and go consumers.
Also we need broker with event sourcing support.
as far as I understand NATS JetStream (JS) supports this pattern.
on go side we will use nats.go with relative new js client
and I am sure that all js functionality is supported by go lib
I understand that nats.php uses JetStream wire API, but still is not clear actually supported API
Do you have stories/cases/links of nats.php usage in the production?
What is intended method to implement production-ready RPC server using this lib?
Is it possible to bootstrap some loop (using roadrunner?) with good stability and proper connection-outage handling?
Other libs make use of language-specific features, so PHP way is kinda tricky
https://github.com/nats-io/nats.py/blob/main/examples/service.py#L59
Is there any chance that this package could be made compatible with PHP 7? We'd be open to contribute to any required changes if need be - but would first want to know if you'd be open to support older PHP versions.
We're looking at introducing NATS in our environment, but would like to be able to integrate it with our legacy systems which are running PHP 7.2.
There's an edge case error when the connection is dropped:
Cannot assign null to property Basis\Nats\Connection::$infoMessage of type Basis\Nats\Message\Info
/app/vendor/basis-company/nats/src/Connection.php:202
/app/vendor/basis-company/nats/src/Connection.php:127
/app/vendor/basis-company/nats/src/Client.php:207
/app/vendor/basis-company/nats/src/Client.php:127
/app/vendor/basis-company/nats/src/Consumer/Consumer.php:105
But the API expects non-nullable, and im wondering if it should be refactored a bit :)
Hello, are there any plans to implement this API?
Are there any architectural limitations for the library to implement this API?
https://github.com/nats-io/nats.go/blob/main/object.go#L52C6-L52C17
https://natsbyexample.com/examples/os/intro/deno
I'm trying to use this library to connect to NGS with TLS, a JWT, and a NKEY... I'm getting the following errors when connecting using the following configuration:
$config = new Configuration([
'host' => 'tls://connect.ngs.global:4222'
'nkey' => 'mynkey',
'jwt' => 'myjwt',
]);
The specific error message is coming from Client->connect()
on line 91, Connection error
. https://github.com/basis-company/nats.php/blob/main/src/Client.php#L91
Hi,
I'm trying the basic publish/subscribe example and get:
Fatal error: Uncaught Error: Call to undefined method Basis\Nats\Message\Msg::getPayload()
And indeed; Basis\Nats\Message\Msg doesn't seem to have a getPayload method?
Hello,
I'm setting up a SNS/SQS like fan-out queuing system with NATS.
I setup my stream with its subject and two durable pull consumers.
My publisher is working and is pushing messages into my stream. This part is ok.
My 2 subscriber scripts connect to each consumer and get the messages of the stream when they start. This part is ok too.
// subscriber 1
$stream = $client
->getApi()
->getStream('mystream');
$consumer = $stream->getConsumer('consumer1');
$consumer->handle(function ($message) {
printf("Data: %s\r\n", $message);
});
The problem is : after receiving messages at start, the subscribers do not receive live new messages pushed by the publisher. I need to restart them to have them receive the new messages. I want them to listen forever.
I had a look around iterations
but unsuccessfully... What am I missing ?
Thanks for your help
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.