Code Monkey home page Code Monkey logo

rxjs-websockets's Introduction

rxjs-websockets

build status Known Vulnerabilities Renovate

An rxjs websocket library with a simple and flexible implementation. Supports the browser and node.js.

Comparisons to other rxjs websocket libraries:

  • observable-socket
    • observable-socket provides an input subject for the user, rxjs-websockets allows the user to supply the input stream as a parameter to allow the user to select an observable with semantics appropriate for their own use case (queueing-subject can be used to achieve the same semantics as observable-socket).
    • With observable-socket the WebSocket object must be used and managed by the user, rxjs-websocket manages the WebSocket(s) for the user lazily according to subscriptions to the messages observable.
    • With observable-socket the WebSocket object must be observed using plain old events to detect the connection status, rxjs-websockets presents the connection status through observables.
  • rxjs built-in websocket subject
    • Implemented as a Subject so lacks the flexibility that rxjs-websockets and observable-socket provide.
    • Does not provide any ability to monitor the web socket connection state.

Installation

npm install -S rxjs-websockets
# or
yarn add rxjs-websockets

For rxjs 6 support, rxjs-websockets 8 is needed.

npm install -S rxjs-websockets@8
# or
yarn add rxjs-websockets@8

Changelog

Changelog here

Simple usage

import { QueueingSubject } from 'queueing-subject'
import { Subscription } from 'rxjs'
import { share, switchMap } from 'rxjs/operators'
import makeWebSocketObservable, {
  GetWebSocketResponses,
  // WebSocketPayload = string | ArrayBuffer | Blob
  WebSocketPayload,
  normalClosureMessage,
} from 'rxjs-websockets'

// this subject queues as necessary to ensure every message is delivered
const input$ = new QueueingSubject<string>()

// queue up a request to be sent when the websocket connects
input$.next('some data')

// create the websocket observable, does *not* open the websocket connection
const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')

const messages$: Observable<WebSocketPayload> = socket$.pipe(
  // the observable produces a value once the websocket has been opened
  switchMap((getResponses: GetWebSocketResponses) => {
    console.log('websocket opened')
    return getResponses(input$)
  }),
  share(),
)

const messagesSubscription: Subscription = messages.subscribe({
  next: (message: string) => {
    console.log('received message:', message)
    // respond to server
    input$.next('i got your message')
  },
  error: (error: Error) => {
    const { message } = error
    if (message === normalClosureMessage) {
      console.log('server closed the websocket connection normally')
    } else {
      console.log('socket was disconnected due to error:', message)
    }
  },
  complete: () => {
    // The clean termination only happens in response to the last
    // subscription to the observable being unsubscribed, any
    // other closure is considered an error.
    console.log('the connection was closed in response to the user')
  },
})

function closeWebsocket() {
  // this also caused the websocket connection to be closed
  messagesSubscription.unsubscribe()
}

setTimeout(closeWebsocket, 2000)

The observable returned by makeWebSocketObservable is cold, this means the websocket connection is attempted lazily as subscriptions are made to it. Advanced users of this library will find it important to understand the distinction between hot and cold observables, for most it will be sufficient to use the share operator as shown in the example above. The share operator ensures at most one websocket connection is attempted regardless of the number of subscriptions to the observable while ensuring the socket is closed when the last subscription is unsubscribed. When only one subscription is made the operator has no effect.

By default the websocket supports binary messages so the payload type is string | ArrayBuffer | Blob, when you only need string messages the generic parameter to makeWebSocketObservable can be used:

const socket$ = makeWebSocketObservable<string>('ws://localhost/websocket-path')
const input$ = new QueueingSubject<string>()

const messages$: Observable<string> = socket$.pipe(
  switchMap((getResponses: GetWebSocketResponses<string>) => getResponses(input$)),
  share(),
)

Reconnecting on unexpected connection closures

This can be done with the built-in rxjs operator retryWhen:

import { Subject } from 'rxjs'
import { switchMap, retryWhen } from 'rxjs/operators'
import makeWebSocketObservable from 'rxjs-websockets'

const input$ = new Subject<string>()

const socket$ = makeWebSocketObservable('ws://localhost/websocket-path')

const messages$ = socket$.pipe(
  switchMap((getResponses) => getResponses(input$)),
  retryWhen((errors) => errors.pipe(delay(1000))),
)

Alternate WebSocket implementations

A custom websocket factory function can be supplied that takes a URL and returns an object that is compatible with WebSocket:

import makeWebSocketObservable, { WebSocketOptions } from 'rxjs-websockets'

const options: WebSocketOptions = {
  // this is used to create the websocket compatible object,
  // the default is shown here
  makeWebSocket: (url: string, protocols?: string | string[]) => new WebSocket(url, protocols),

  // optional argument, passed to `makeWebSocket`
  // protocols: '...',
}

const socket$ = makeWebSocketObservable('ws://127.0.0.1:4201/ws', options)

JSON messages and responses

This example shows how to use the map operator to handle JSON encoding of outgoing messages and parsing of responses:

import { Observable } from 'rxjs'
import makeWebSocketObservable, { WebSocketOptions, GetWebSocketResponses } from 'rxjs-websockets'

function makeJsonWebSocketObservable(
  url: string,
  options?: WebSocketOptions,
): Observable<unknown> {
  const socket$ = makeWebSocketObservable<string>(url, options)
  return socket$.pipe(
    map(
      (getResponses: GetWebSocketResponses<string>) => (input$: Observable<object>) =>
        getResponses(input$.pipe(map((request) => JSON.stringify(request)))).pipe(
          map((response) => JSON.parse(response)),
        ),
    ),
  )
}

The function above can be used identically to makeWebSocketObservable only the requests/responses will be transparently encoded/decoded.

rxjs-websockets's People

Contributors

cpu1 avatar cristianp6 avatar greenkeeper[bot] avatar ifwu avatar illusionalsagacity avatar insidewhy avatar nichita-pasecinic avatar nikolasleblanc avatar renovate-bot avatar renovate[bot] avatar richardivan avatar simonalbrecht avatar zhuangya avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxjs-websockets's Issues

Add authentication header

Hello,

How can I add authentication header to my websocket?
The websocket URL is ws://user:password@localhost:8080 but on the server is no authentication header.

This is my connection:

      this.messages = websocketConnect(
        url,
        this.inputStream = new QueueingSubject<any>(),
        url => new WebSocket(url.toString()),
        false
      ).messages.share();

Please Include License File

Hi Looks like you have License as ISC but since you don't have a license file the copyright and license attribution would not apply.

Argument of type 'QueueingSubject<{}>' is not assignable to parameter of type 'Observable<any>'

With rxjs-websockets 1.1.0, queueing-subject 0.1.1 & typescript 2.4.2 I have the following code:

import Rx from "rxjs/Rx";
import { QueueingSubject } from "queueing-subject";
import websocketConnect, {Connection} from "rxjs-websockets";

const queuingSubject = new QueueingSubject();

// ...

const connection: Connection =  websocketConnect(
    myUrl,
    queuingSubject,
    myWebSocket
);

After upgrading from rxjs 5.4.2 to 5.4.3 I get the following type error in the webSocketConnect call:

error TS2345: Argument of type 'QueueingSubject<{}>' is not assignable to parameter of type 'Observable<any>'.  
Property 'source' is protected but type 'Observable<T>' is not a class derived from 'Observable<T>'.

I cleaned the cache (I'm using yarn), deleted node_modules and reinstalled but still get this error....

Support WebSocket's protocol parameter

Hello,

Thankyou for the great works, I need help on documentation about how to define websocket protocol parameter like this one:

var exampleSocket = new WebSocket("ws://www.example.com/socketserver", "protocolOne");

Where should I declare that?

Please help, thank you.

OnClose event trigger

Hello,
I'm trying to implement auto reconnect with your angular 4 example but I can't find a way to plug the on close event to this block:

this.messages = websocketConnect(
'ws://127.0.0.1:4201/ws',
this.inputStream = new QueueingSubject()
).messages.share()

Have you an example of implementation for autoreconnect or close detection?

Thank you for your time

Socket disconnection status not propagating to parent component

I have been using rxjs-websockets v8.0.1 in react (typescript) as a separate class called Socket. And this Socket class is being used in multiple components. Following is my way to subscribe to connection status. 2 is the default (when component initializes), 1 is connected and 0 is disconnected.

inputStream$: QueueingSubject<string> = new QueueingSubject<string>();
messages$!: Observable<string>;
connectionStatus$: BehaviorSubject<number> = new BehaviorSubject(2);
messagesSubscription: Subscription | null = null;

connect(url: string) {
        
    const socket$ = makeWebSocketObservable(url);
	
	this.messages$ = socket$.pipe(
		  switchMap((getResponses: GetWebSocketResponses<string>) => {
			this.connectionStatus$.next(1);
			return getResponses(this.inputStream$);
		  }),
		  retryWhen(errors => {
			console.log("disconnected - retryWhen block");
			this.connectionStatus$.next(0);
			return errors.pipe(
					filter(err => !equals(err.message, normalClosureMessage)),
					delay(1000));
		  }),
		  share(),
	  );
	  
	  
	  this.messagesSubscription = this.messages$!.subscribe(
		  _ => {
			return;
		  },
		  (error: Error) => {
			const { message } = error;
			  this.connectionStatus$.next(0);
			  console.log('socket was disconnected due to error:', message);        
		  },
		  () => {
			// The clean termination only happens in response to the last
			// subscription to the observable being unsubscribed, any
			// other closure is considered an error.
			this.connectionStatus$.next(0);
			console.log('the connection was closed in response to the user')
		  },
	  )
  }
  

The parent component is subscribing to connectionStatus$ and accordingly behave. For testing I am closing WebSocket connection from server by restarting the server. But after the connection, only for the first disconnection event, I see disconnected - retryWhen block and parent component get that disconnection event. After that, if there is a disconnection, I don't see that log and parent component does not get the disconnection event/status.

Also, when I deliberately turn off network adapter, I see the WebSocket connection gets closed and retries get started which keeps failing with the below logs.

glue.js:30 WebSocket connection to 'wss://mydomain.com/glue/ws' failed: Error in connection establishment: net::ERR_NAME_NOT_RESOLVED
o.open @ glue.js:30
(anonymous) @ glue.js:148
d @ raven.min.js:2
raven.js:80 ["the websocket closed the connection with an error."]

Note that the socket does get reconnected in every case, whether I restart the server or on/off wifi adapter. So reconnection is not the issue. But not identifying and propagating each and every disconnection event is the issue. Also, after reconnected, I get the event in the parent component, which I think due to the first message response (see switchMap in the above code).

Maybe I am missing something here. But what is the correct way to subscribe to connection state change?

Thanks

online/offline state

Do you have an example on how to get the online/offline state of your websocket server, through this api?
I want to know, if the websocket server is offline and notify the end-user.

Angular 6 service

Does anyone have a mock that could help me understand how to encapsulate the library as a Angular Service global to the app?
Thanks and thank you James for doing this library

7.0+ API difficult to reason about

Hi James,

This is not a bug, this is just the only avenue I could find to give feedback. Back in 2017 my team started using your angular2-websocket-service library at the point where it was still a service. I started this week to upgrade from versions of Angular from 4 to 8, and needed to move up to using rxjs-websockets because the version of RxJS included in angular2-websocket-service was not compatible with the version of RxJS I had rolled up to in my project. Looking at your latest readme.md, I'm barely been able to understand the new API you've implemented in version 7.0 without a lot of head scratching. I've looked through the source code of the library and I find the internals equally difficult to reason about now, compared to what it was two years ago. I've looked through the commits to try and understand when the changes occurred and why they were made.

Specifically the change away from taking an input observable in the connect method, in the January 7th commit - eaf8fc4 - has a commit message to the effect of "simplify connection status handling", but I think it doesn't simplify the library as a whole. It was easier to reason about the system when one was passing in a stream of input. Now the input is somehow supplied through something we subscribe to the output. I get that we are receiving a factory as output when we subscribe, but it's still convoluted in my opinion, and the naming doesn't help sort it out. I'm sure I'll be able to figure it out. I'm just a bit frustrated and trying to see the virtue of the change, that's all. Is it that using switchMap allows the application to replace the input stream while keeping the websocket open?
Anyway, thanks for providing this library.

ngrx effects example

Thanks for this lib. I want to use it in an angular 4 app, with ngrx (effects). Do you have a small example on how to tackle that?

Catching onclose & error from server side (Websocket server error)

If I connect my webapp to the server through a websocket and then If I close the server, I am getting an error (as expected) on the client side application. How can I catch & handle this error?

weby-webpack.js:1625 ERROR Error
    at WebSocket.socket.onclose (weby-webpack.js:131362)
    at WebSocket.wrapFn [as __zone_symbol___onclose] (weby-webpack.js:54658)
    at ZoneDelegate.invokeTask (weby-webpack.js:54026)
    at Object.onInvokeTask (weby-webpack.js:4657)
    at ZoneDelegate.invokeTask (weby-webpack.js:54025)
    at Zone.runTask (weby-webpack.js:53793)
    at WebSocket.ZoneTask.invoke (weby-webpack.js:54088)

Not able to get binary messages

Binary messages does have the length but actual blob does not have the message binary blob. It seems like the message is always observable with type of message is string only (index.ts, line 8) = "messages: Observable<string>"

Unsubscribe doesn't close the connection in every case

Here's an example.

On a Linux terminal, type this to open a TCP listener on port 5362 that accepts connections and doesn't send anything nothing:

nc -l 5362

Then execute this code from a browser:

// example 1: browser "Firefox can’t establish a connection" error doesn't
// appear in 2 seconds, but in 15 (wrong behaviour - what if it managed to
// connect somewhere in between? I would end up with an unused websocket
// connection forever, which would burden the server)
console.log('starting example 1...')
let s = makeWebSocketObservable('ws://localhost:5362').subscribe()
setTimeout(() => s.unsubscribe(), 2e3)

// example 2: browser error "The connection to ws://localhost:5362/ was interrupted"
// appears in 2 seconds (which I think is the correct behaviour)
setTimeout(() => {
    console.log('starting example 2...')
    var ws = new WebSocket('ws://localhost:5362')
    setTimeout(() => ws.close(), 2e3)
}, 30e3)

Handling connection errors

I need to be able to handle connection errors and the onclose events - but I see the connectionStatus is number of connected not the status of the connection ?

How would I go about reporting errors ?

connect method issue

TypeError: this.socketFactory.connect(...).share is not a function
at WebService.connect (websocket.service.ts:21)
at new AppComponent (app.component.ts:13)
at createClass (core.js:12481)
at createDirectiveInstance (core.js:12326)
at createViewNodes (core.js:13784)
at createRootView (core.js:13673)
at callWithDebugContext (core.js:15098)
at Object.debugCreateRootView [as createRootView] (core.js:14381)
at ComponentFactory_.create (core.js:11278)
at ComponentFactoryBoundToModule.create (core.js:4030

Response & Timeout or any other pattern

It may or may not be related to the scope of the library however I could not find enough information on internet so that I ask.

When I use the send method of the websocket service, is there anyway that I can follow up the response? I know that I can send messages in one tunnel and listen to the responses in another tunnel but what I didnt understand is how I will get them all in a meaningful order. Here is an example case:

When I click on a button on my webpage (join room) I send a JOIN_ROOM_REQUEST to the websocket on the server. And meanwhile I disable the button because I do not want the user to click again and be prompted when his join request is accepted and executed. If everything goes well, the server will send an event over the websocket so that I can now let the user to join the room. However what if the server does not respond at all? How will I handle this situation? The button hanged disabled and the state cannot be updated back to its initial state.

Override socket.onmessage

Hi James, first of all thanks for this library!
I need to override socket.onmessage behaviour in order to prevent JSON.parse and handle some binary data instead of string messages: could you please give an example on how to do this? Perhaps passing a custom WebSocketFactory as third param of websocketConnect?
Tnx!

two websockets when using the reconnect example

Hi,

I have the following problem... when I add the reconnect code from the example it always opens directly a second websockets to the server.

number of connected websockets: 0
number of connected websockets: 1
number of connected websockets: 2
received message: some data

Catching Websocket Connection Errors in ConnectionStatus Observable

Currently it seems that both the messages and connectionStatus observables do not handle errors with an invalid websocket connection. I am attempting to retry connection if one of two things happens

  1. I cannot establish a connection to the websocket at all
  2. The websocket is not replying to my messages

Now, I can rudimentarily handle both of these cases by doing this:

messages
            .race(Observable.never().timeout(this.initialMessageTimeout * 1000))
            .pipe(retryWhen(errors => errors.pipe(
                tap(val => {
                    this.connectionIssues = true;
                    this.toast.error('We are having issues communicating with our chat service.');
                }),
                delay(1000))))
            .subscribe((msg: string) => {});

essentially, after n seconds of not recieving a message from the websocket I will show an error to the end user and attempt to retry the connection.

However, when testing with the websocket server down I see errors in my console for the websocket connection, but neither messages or connectionStatus have any errors thrown in it, which I think should probably be the case since it is handling the websocket connection. I see in the code that you handle this here:

    socket.onerror = (error: ErrorEvent) => {
      closed()
      observer.error(error)
    }

however I cannot collect any error via:

messages
            .race(Observable.never().timeout(this.initialMessageTimeout * 1000))
            .pipe(retryWhen(errors => errors.pipe(
                tap(val => {
                    this.connectionIssues = true;
                    this.toast.error('We are having issues communicating with our chat service.');
                }),
                delay(1000))))
            .subscribe((msg: string) => {}, (err) => {console.log(err)});

Which i think should be the case.

I'm proposing that the connectionStatus observable should also attempt to carry information regarding the state of the connection to the websocket so implementations can leverage the knowledge of the websocket connection.

Using proxy path

Hi

Is it possible to use a relative path with a proxy rewrite?

I got message:
DOMException: Failed to construct 'WebSocket': The URL '/apis/websockets/route?param=value' is invalid.

Thanks

Reopen connection

Thank for the lib, it's really useful.
Can you tell me pls, how can i try to reconnect to the server for exmaple every 5 seconds, if the server is restarting. If i lost the connection, and wouldn't reconnect.

the reconect problem with input

if i have connected the websocket server, then the connection breaks, and then reconnect again
it seems that input does not call socket.send again
inputSubscription = input.subscribe(data => { socket.send(data) })
do i need to call input.next again?

Omitting protocol parameter to websocketConnect() results in bad header on Firefox

Omitting the protocols parameter to websocketConnect() causes Firefox to send an invalid protocol header to the server

Calling websocketConnect(url, this.input) results in this header in the HTTP Upgrade request:

Sec-WebSocket-Protocol: undefined

The WebSocket connection immediately closes and Firefox prints a "Firefox can’t establish a connection to the server at [URL]" message in the console.

But if I add an empty array parameter: websocketConnect(url, this.input, []), there is no Sec-WebSocket-Protocol header in the HTTP Upgrade request.

Chrome 70 doesn't create the Sec-WebSocket-Protocol header in either case.

I am proxying the WebSocket connection through nginx to a Windows server running a custom service that uses the WebSocket implementation in .NET.

Versions:

  • rxjs-websockets 6.0.2
  • Firefox 63 on Ubuntu 16.04
  • nginx 1.14.0-0ubuntu1.2
  • .NET Framework 4.7

npm install -S?

I'm guessing -S is to save to save to your package.json -- but that's the default now.

How to make use of this in angular 4 project?

I have project where need to use WebSocket and your library seems me a good fit as it uses observables and rxjs but when I installed it gives this warning in terminal.

npm WARN [email protected] requires a peer of rxjs@^6.1.0 but none is installed. You must install peer dependencies yourself.
npm WARN [email protected] requires a peer of rxjs@^6.1.0 but none is installed. You must install peer dependencies yourself.

I am having rxjs:^5.1.0 in package.json and when I tried to upgrade only rxjs version it throws various errors.

ng-v

Angular CLI: 1.7.4
Node: 8.9.2
OS: darwin x64
Angular: 4.4.7
... animations, common, compiler, compiler-cli, core, forms
... http, language-service, platform-browser
... platform-browser-dynamic, router, tsc-wrapped

@angular/cli: 1.7.4
@angular-devkit/build-optimizer: 0.3.2
@angular-devkit/core: 0.3.2
@angular-devkit/schematics: 0.3.2
@ngtools/json-schema: 1.2.0
@ngtools/webpack: 1.10.2
@schematics/angular: 0.3.2
@schematics/package-update: 0.3.2
typescript: 2.3.4
webpack: 3.11.0

So I just want to know Can I use this into my project or if not then what changes required to use this ?

Help with Reconnection

Hello,

I'm not very familiar with RxJS, and I need some help reconnecting the websocket so that the messages are linked back to the Observable.

I'm using this to initialize the observable
this.messages = websocketConnect( 'ws://127.0.0.1:8080/api', this.inputStream = new QueueingSubject<string>() ).messages.share().

How can I use this code:
this.connectionStatusSubscription = this.messages.retryWhen(errors => errors.delay(1000)).subscribe(message => { console.log(message); })

to put the messages into the observable (this.messages).

RxJS 7.x compat

https://www.npmjs.com/package/rxjs
Versions show that RxJS 7 has been released for 3 months. This library is now limited to 6.6.x which holds me from using it - not a big issue but my thought was if I think this maybe someone else does when they do a fresh rxjs install using NPM/yarn.
Would be lovely to see this wonderful implementation be suitable for RxJS 7+. It is worth your past efforts to give it a shot soon and keep this library fresh and state of the art. Keep up the good work 🚀

`takeUntil` on messages is not closing the connection

const { messages, connectionStatus } = websocketConnect(
  `${wsProtocol}://${endpoint}/api/public/web-socket/${token}`,
   new QueueingSubject<string>()
);

messages.pipe(takeUntil(delay(5000)).subscribe()

this should automatically close the connection, right?

Closing socket connection

I think the readme is misleading, because the code below only unsubscribes but does not close the socket connection.
messagesSubscription.unsubscribe()

hence, how can I close the socket connection ?

(I use Angular)

Property 'share' does not exist on type 'Observable<string>'

Hello,

I have a problem when following the example, I using the latest version 3. I can not use the 'share' method, the error log is this:

Property 'share' does not exist on type 'Observable<string>'

Here is my code on the service part:

import { Injectable } from '@angular/core';
import { QueueingSubject } from 'queueing-subject';
import { Observable } from 'rxjs/Observable';
import  websocketConnect , { Connection } from 'rxjs-websockets';
 
@Injectable()
export class ServerSocketService {
  private inputStream: QueueingSubject<string>;
  public messages: Observable<string>;

  public connect() {
    if (this.messages){
      return;
    }
    
    // Using share() causes a single websocket to be created when the first
    // observer subscribes. This socket is shared with subsequent observers
    // and closed when the observer count falls to zero.
    this.messages = websocketConnect(
      'ws://192.168.0.23:81/ws',
      this.inputStream = new QueueingSubject<string>(),
      'Protocol1'
    ).messages.share();
  }

  public send(message: string):void {
    // If the websocket is not connected then the QueueingSubject will ensure
    // that messages are queued and delivered when the websocket reconnects.
    // A regular Subject can be used to discard messages sent when the websocket
    // is disconnected.
    this.inputStream.next(message)
  }
}

And this is my component part:

import { Component, OnInit } from '@angular/core';
import { Subscription } from 'rxjs/Subscription'
import { ServerSocketService } from '../server-socket.service'
@Component({
  selector: 'app-dashboard',
  templateUrl: './dashboard.component.html',
  styleUrls: ['./dashboard.component.css']
})
export class DashboardComponent implements OnInit {
  private socketSubscription: Subscription

  constructor(private socket: ServerSocketService) { }

  ngOnInit() {
    this.socket.connect()
    
    this.socketSubscription = this.socket.messages.subscribe((message: string) => {
      console.log('received message from server: ', message)
    })

    // send message to server, if the socket is not connected it will be sent
    // as soon as the connection becomes available thanks to QueueingSubject
    this.socket.send('hello')
  }
  ngOnDestroy() {
    this.socketSubscription.unsubscribe()
  }

}

My packages.json is like this:

{
  "name": "my-app",
  "version": "0.0.0",
  "license": "MIT",
  "scripts": {
    "ng": "ng",
    "start": "ng serve",
    "build": "ng build",
    "test": "ng test",
    "lint": "ng lint",
    "e2e": "ng e2e"
  },
  "private": true,
  "dependencies": {
    "@angular/animations": "^4.2.4",
    "@angular/common": "^4.2.4",
    "@angular/compiler": "^4.2.4",
    "@angular/core": "^4.2.4",
    "@angular/forms": "^4.2.4",
    "@angular/http": "^4.2.4",
    "@angular/platform-browser": "^4.2.4",
    "@angular/platform-browser-dynamic": "^4.2.4",
    "@angular/router": "^4.2.4",
    "core-js": "^2.4.1",
    "queueing-subject": "^0.1.1",
    "rxjs": "^5.4.2",
    "rxjs-websockets": "^3.0.1",
    "zone.js": "^0.8.14"
  },
  "devDependencies": {
    "@angular/cli": "1.4.1",
    "@angular/compiler-cli": "^4.2.4",
    "@angular/language-service": "^4.2.4",
    "@types/jasmine": "~2.5.53",
    "@types/jasminewd2": "~2.0.2",
    "@types/node": "~6.0.60",
    "codelyzer": "~3.1.1",
    "jasmine-core": "~2.6.2",
    "jasmine-spec-reporter": "~4.1.0",
    "karma": "~1.7.0",
    "karma-chrome-launcher": "~2.1.1",
    "karma-cli": "~1.0.1",
    "karma-coverage-istanbul-reporter": "^1.2.1",
    "karma-jasmine": "~1.1.0",
    "karma-jasmine-html-reporter": "^0.2.2",
    "protractor": "~5.1.2",
    "ts-node": "~3.2.0",
    "tslint": "~5.3.2",
    "typescript": "~2.3.3"
  }
}

Please give me a clue what I'm doing wrong. Thankyou.

Issue With retryWhen

Below changes that you made will make retryWhen not working as "if (forcedClose) observer.complete();" makes observer completes.

return () => {
forcedClose = true;
if (inputSubscription) inputSubscription.unsubscribe();

  if (socket) {
    closed();
    socket.close();
  }
};

Websocket closes itself after few minutes

core.js:1350 ERROR Error at WebSocket.socket.onclose [as __zone_symbol__ON_PROPERTYclose] (index.js:38) at WebSocket.wrapFn (zone.js:1166) at ZoneDelegate.invokeTask (zone.js:425) at Object.onInvokeTask (core.js:4617) at ZoneDelegate.invokeTask (zone.js:424) at Zone.runTask (zone.js:192) at ZoneTask.invokeTask [as invoke] (zone.js:499) at invokeTask (zone.js:1540) at WebSocket.globalZoneAwareCallback (zone.js:1566)

Question: Can I use this library for STOMP over websocket?

Hi, I am building a server which uses STOMP over websocket.

Since the API of rxjs-websockets is cleaner than other libraries, I would like to use this library for client side websocket connection

  • Does this library supports STOMP over websocket?
  • Or is there an work around can I use?

Thanks!

Observable<any> is not assignable to type QueueingSubject<any>

I am talking about this example:

https://www.npmjs.com/package/angular2-websocket-service

1- Here the package is angular2-websocket-service but on github it is rxjs-websockets
2- In the example share is not defined; missing rxjs/share import
3- The second parameter of this.socketFactory.connect requires Observable. QueueingSubject is not accepted. Gives compile error (ng build)
4- When I pass an Observable and try to subscribe, gives "property subscribe of undefined not found" error.

Missing "var WebSocket = require("ws")" in "node_modules/rxjs-websockets/lib/index.js

Hi James,

I tried to integrate your lib into my node project which is raw javascript but not Typescript project. With some fixes for Typescript compilation, I faced an issue:

number of connected websockets: 0
/Volumes/DATA/sources/node_server/node_modules/rxjs/Observable.js:165
throw sink.syncErrorValue;
^

ReferenceError: WebSocket is not defined
at defaultWebsocketFactory (/Volumes/DATA/sources/node_server/node_modules/rxjs-websockets/lib/index.js:5:58)
at Observable._subscribe (/Volumes/DATA/sources/node_server/node_modules/rxjs-websockets/lib/index.js:10:22)
.....

I fixed this issue by add the line of code
"var WebSocket = require("ws")" in "node_modules/rxjs-websockets/lib/index.js

Could you please review the fix and have a new version of you rxjs-websocket library?

Thanks,
Toan Pham
Developer

Type errors on WebSocket Factory

index.ts:39:2 - error TS2345: Argument of type '(x: string, y: string | string[] | undefined) => WebSocket' is not assignable to parameter of type 'WebSocketFactory | undefined'.
  Type '(x: string, y: string | string[] | undefined) => WebSocket' is not assignable to type 'WebSocketFactory'.
    Type 'WebSocket' is not assignable to type 'IWebSocket'.
      Types of property 'onopen' are incompatible.
        Type '(event: { target: WebSocket; }) => void' is not assignable to type '((event: Event) => any) | undefined'.
          Type '(event: { target: WebSocket; }) => void' is not assignable to type '(event: Event) => any'.
            Types of parameters 'event' and 'event' are incompatible.
              Type 'Event' is not assignable to type '{ target: WebSocket; }'.
                Types of property 'target' are incompatible.
                  Type 'EventTarget | null' is not assignable to type 'WebSocket'.
                    Type 'null' is not assignable to type 'WebSocket'.

39  (x,y) => new WebSocket(x,y)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

The reason is that if you use ws, it's typed with @types/ws like this,

onopen: (event: { target: WebSocket }) => void;
onerror: (event: {error: any, message: string, type: string, target: WebSocket }) => void;
onclose: (event: { wasClean: boolean; code: number; reason: string; target: WebSocket }) => void;
onmessage: (event: { data: WebSocket.Data; type: string; target: WebSocket }) => void;

While currently the factory is typed with an interface like this,

onopen?: (event: Event) => any
onclose?: (event: CloseEvent) => any
onmessage?: (event: MessageEvent) => any
onerror?: (event: ErrorEvent) => any

Moreover, ErrorEvent and the like are typed like this,

export class ErrorEvent
    {
    constructor(title?: string, message?: string, data?: Object);

    // properties
    data: Object;
    message: string;
    title: string;
    }

I think we should mirror the definitions of ws rather than define our own. I assume ws is mirroring the definition of the browsers native types (when provided).

Update to support RxJS 6

This package should be updated to support RxJS 6.

Otherwise, this breaks compatibility with Angular 6 because the peer dependencies are incompatible:

$ ng update @angular/core --force
Package "rxjs-websockets" has an incompatible peer dependency to "rxjs" (requires "^5.0.1", would install "6.1.0").

Maybe it is possible to integrate rxjs-compat to help support both, 5x and 6x.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.