chartchuo / dart-nats Goto Github PK
View Code? Open in Web Editor NEWA Dart client for the NATS messaging system. Design to use with Dart and Flutter.
License: MIT License
A Dart client for the NATS messaging system. Design to use with Dart and Flutter.
License: MIT License
สวัสดีครับ ผมตามมาจาก youtube ละครับ
ผมลอง v 0.1.6 แล้วผล คือ client สามารับ sub message ได้ครบถ้วนจนถึง massage สุดท้ายเรียบร้อย ไม่ delay แล้ว แต่ผมยังเจอปัญหาอยู่เมื่อ client ได้รับ sub message ติดๆกันหรือพร้อมๆกัน เมื่อ log ดู ใน file client.dart สามารถรับ op case 'msg' ได้ครบถ้วนทุก event และเข้า _processMsg() ทุก event แต่เวลาที่จะส่ง payload เข้า _subs[sid].add (Message... มันเหมือน save ทับ message กันครับ ส่งผลให้ได้ msg เพียงอันสุดท้ายที่ส่งต่อไป snapshot.data
ผมคิดว่าน่าจะแปลง <nats.Message> เป็น list ของ messages น่าจะแก้ปัญหานี้ได้ แล้วเมื่อเวลาจะใช้ snapshot.data ผมก็จะใช้ท่า forEach((data) {}) เอาแต่ละ message ไปใช้งานได้
แต่...ผมไม่มีความสามารถแก้ไขได้สำเร็จ คงต้องรบกวนท่านอีกครั้งแล้วหล่ะครับ ><"
Great project, thanks!
Is it possible to use manual ack?
https://docs.nats.io/developing-with-nats-streaming/acks
is there any example if we use MobX with Observable instead of streamBuilder?
I am updating this thread because I cannot listen anything when subscribing my nats server. it not works. I have following your sample code, and success to connect. Honestly, the message whom published from server did not appear..
Hi,
migrating from 2.0 to 3.0, I am unable to log with user/password.
flutter snippet :
var natsClient = Nats.Client();
var connectOptions = Nats.ConnectOption(
user: natsUser,
name: natsUser,
pass: natsPassword,
tlsRequired: false,
);
Uri uri = Uri.parse("ws://$natsURL:4222");
natsClient.connect(uri, connectOption: connectOptions, retryInterval: 5, timeout: 10, retry: true);
Where natsUser
is actually equal to "user".
leads to failure to connect. On the server side :
[9] 2021/06/23 11:24:17.737836 [DBG] 10.2.1.90:36448 - cid:616 - Client connection created
[9] 2021/06/23 11:24:17.905280 [ERR] 10.2.1.90:36448 - cid:616 - authentication error - User ""
[9] 2021/06/23 11:24:17.905745 [TRC] 10.2.1.90:36448 - cid:616 - ->> [-ERR Authorization Violation]
[9] 2021/06/23 11:24:17.905969 [DBG] 10.2.1.90:36448 - cid:616 - Client connection closed: Authentication Failure
With 2.0, I could log in without problem.
Thank you !
I have an error Unsupported operation: Platform._version, when connecting to wss://{domain}:{port}
with my Chrome browser.
I try WebSocket change to
import 'package:web_socket_channel/web_socket_channel.dart';
WebSocketChannel.connect(uri);
The connection will be connected.
WebSocket from 'dart:html' also works well for the web.
repoduce step
var client = Client();
var gotit = false;
client.statusStream.listen(
(s) {
print(s);
},
onError: (e) {
gotit = true;
},
);
try {
await client.connect(Uri.parse('ws://localhost:1234'),
retry: false, retryInterval: 1);
} on NatsException {
gotit = true;
} on WebSocketChannelException {
gotit = true;
} catch (e) {
gotit = true;
}
Good job! It works on Android and IOS. Does this work with Flutter Web ?
Hopefully It supports Flutter Web. Thanks
test('repeat resquest', () async {
var server = Client();
await server.connect('localhost');
var service = server.sub('service');
unawaited(service.stream.first.then((m) {
m.respond(Uint8List.fromList('respond'.codeUnits));
}));
var client = Client();
await client.connect('localhost');
var receive = await client.request(
'service', Uint8List.fromList('request'.codeUnits));
receive = await client.request(
'service', Uint8List.fromList('request'.codeUnits));
// receive = await client.request(
// 'service', Uint8List.fromList('request'.codeUnits));
// receive = await client.request(
// 'service', Uint8List.fromList('request'.codeUnits));
client.close();
service.close();
expect(receive.string, equals('respond'));
});
ผมเจอเคสที่ publish ไปแล้วไม่รู้เลยว่า subscribe ได้รับ msg หรือเปล่า ผมจะเอา [x] - Request, Respond ตัวนี้มาใช้ได้ไหมครับ ถ้าได้รบกวนท่านอาจารย์ด้วยนะครับ
สำหรับข้อนี้ยังไม่ใช่ปัญหา แต่เพื่อป้องกันผู้อื่นมิให้ แอบ sub ที่อาจล่วงรู้ ip sub และ channel ของระบบ แล้วแอบ TAP ข้อมูลที่สื่อสารถึงกันได้ ขอคำชี้แนะท่านอาจารย์ว่าควรจะป้องกันอย่างไรดี เท่าที่คิดท่าได้ คือ เข้ารหัสข้อมูลก่อนส่ง และเมื่อส่งถึงก็ถอดรหัสข้อมูลแล้วนำไปใช้ แบบนี้ก็พอแกะยากหน่อย หรือว่าระบบ micro service นี้เขามีระบบป้องกันอยู่แล้วหรือเปล่าครับ
Hi,
when I try to auth via JWT, then I don't get any exception, how can I get to know it worked?
When connected (which always happens, even with clearly wrong creds), then I will request something but, of course, not get a response.
How can I get any insight here?
We're looking into implementing microservices with our Nats client
https://github.com/nats-io/nats.go/blob/main/micro/README.md
Is there any support of it in Dart and Flutter?
I was having problems with subscription messages with larger payloads (like 20000 bytes) not working properly.
It looks like the code assumes that if a CR is present in the buffer, then there is a full protocol message in the buffer ... but for larger payloads on the MSG type message, the full payload can take longer to arrive.
I am using the code below to get around this, seems to work ok. Basically it does a check to see if the buffer contains a MSG, and if so plucks out the length of the message, then checks to see if the buffer contains the whole payload ... and only calls the processOp() if the payload is complete ... otherwise it breaks out of the while() loop
Line 118 in 3bfd992
while (_receiveState == _ReceiveState.idle && _buffer.contains(13)) {
var n13 = _buffer.indexOf(13);
var msgFull = String.fromCharCodes(_buffer.take(n13)).toLowerCase().trim();
var msgList = msgFull.split(' ');
var msgType = msgList[0];
//print('... process $msgType ${_buffer.length}');
if (msgType == 'msg') {
var len = int.parse((msgList.length == 4 ? msgList[3] : msgList[4]));
if (len > 0 && _buffer.length < (msgFull.length + len + 4)) {
break; // not a full payload, go around again
}
}
_processOp();
}
for a larger MSG payload, the commented out print statement produces ...
... process msg 17
... process msg 4113
... process msg 8209
... process msg 12305
... process msg 16401
... process msg 20497
... process msg 21079
While reading the documents it says that Jetstream is built in yet I don't see any mechanic that other libs have IE jetstream() jetstreamManager(). Did I miss something?
If a request response contains headers they are not added to the Message that is passed back to the caller.
The fix is simple : client.dart lines 795-795. Simple add the header in the message constructor
var msg = Message(resp.subject, resp.sid, resp.byte, this,
header: resp.header, jsonDecoder: jsonDecoder);
return msg;
The following code, using version 0.6.1, does not receive any incoming messages on the subscription:
try {
Uri uri = Uri.parse(fullUri);
natsClient?.statusStream.listen((Status event) {
debugPrint('Connection status event $event');
currentStatus = event;
String stateString = '';
});
await natsClient?.connect(uri, retry: true, retryCount: -1);
var sub = natsClient?.sub('mytest.*');
sub?.stream.listen((event) {
debugPrint(event.string);
});
} on HttpException {
debugPrint('Failed to connect!');
} on Exception {
debugPrint('Failed to connect!');
} catch (_) {
debugPrint('Failed to connect!');
}
The connection state appears to work fine and the retry works if connection is lost. But, no incoming messages are received.
However, if I change the connection line to: await natsClient?.connect(uri, retry: false)
, everything works fine and I receive messages. I'm using a plain NATS TCP connection (ie. nats://localhost:4222
).
Am I doing something wrong here, or is this a bug with the retry options somehow?
When connect to nats://demo.nats.io
in macOS and mobile, I have this error
[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: RangeError (start): Invalid value: Only valid value is 0: 2
#0 RangeError.checkValidRange (dart:core/errors.dart:358:7)
#1 _TypedIntListMixin.sublist (dart:typed_data-patch/typed_data_patch.dart:463:31)
#2 Client._sign (package:nats_test/src/client.dart:118:21)
#3 Client._processOp (package:nats_test/src/client.dart:414:15)
#4 Client._steamHandle.<anonymous closure> (package:nats_test/src/client.dart:158:9)
#5 _RootZone.runUnaryGuarded (dart:async/zone.dart:1586:10)
#6 _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:11)
#7 _DelayedData.perform (dart:async/stream_impl.dart:515:14)
#8 _PendingEvents.handleNext (dart:async/stream_impl.dart:620:11)
#9 _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:591:7)
#10 _microtaskLoop (dart:async/schedule_microtask.dart:40:21)
#11 _startMicrotaskLoop (dart:async/schedule_microtask.dart:49:5)
After that, I try to comment on this code
await _sign();
The connection will be connected
กำลังทดสอบ web อยู่เลยครับ
ขอขอบพระคุณล่วงหน้าเป็นอย่างสูงครับ
Thanks for this project.
wondering if this has been developed against NATS 2.0 and Jetstream ?
i have not actually tried yet
When there are errors encountered when connecting to a ws address. The connect goes through successfully and there's no way to hook a handler for when the code into the onError section in the connect implementation below:
try {
_channel = WebSocketChannel.connect(uri);
status = Status.connected;
_connectCompleter.complete();
_addConnectOption(_connectOption);
_backendSubscriptAll();
_flushPubBuffer();
_buffer = [];
_channel!.stream.listen((d) {
// _socket!.listen((d) {
_buffer.addAll(d);
while (
_receiveState == _ReceiveState.idle && _buffer.contains(13)) {
_processOp();
}
}, onDone: () {
status = Status.disconnected;
_channel!.sink.close();
// _socket!.close();
}, onError: (err) {
status = Status.disconnected;
_channel!.sink.close();
// _socket!.close();
});
return;
} catch (err) {
close();
_connectCompleter.completeError(err);
}
Can be easily reproduced using the example code
natsClient = nats.Client();
natsClient.connect(Uri.parse('ws://localhost:1234');
fooSub = natsClient.sub('foo');
barSub = natsClient.sub('bar');
You don't get an error even though 1234 is an invalid port and it goes all the way past the sub without getting any indication that something has gone wrong
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.