Comments (8)
Thanks for this @felixfbecker, - this is awesome!
How does this look with the update to Comlink version 4?;
I had to update proxyValue to proxy, but I can't seem to find what happened to ProxyResult?
Any pointers?
from comlink.
I've also run into this question and implemented a couple of custom transferHandler
s to transfer/handle Observalbe
s and their returned Subscription
s. Seems to work quite well, so far. :)
import { Remote, proxy, releaseProxy, transferHandlers } from 'comlink'; // v4.4.1
import { Observable, Observer, Subscribable, Subscription } from 'rxjs'; // v7.8.0
transferHandlers.set('observable', {
canHandle: (value: unknown): value is Observable<unknown> => {
return value instanceof Observable;
},
deserialize: (value: MessagePort) => {
return new Observable<unknown>((observer) => {
const remote = transferHandlers.get('proxy')!
.deserialize(value) as Remote<Subscribable<unknown>>;
void remote.subscribe(proxy({
next: (next: unknown) => observer.next(next),
error: (error: unknown) => observer.error(error),
complete: () => observer.complete()
})).then((subscription) => observer.add(() => {
subscription.unsubscribe();
remote[releaseProxy]();
}));
});
},
serialize: (value: Observable<unknown>) => {
return transferHandlers.get('proxy')!.serialize({
subscribe: (observer: Remote<Observer<unknown>>) => value.subscribe({
next: (next: unknown) => void observer.next(next).then(),
error: (error: unknown) => void observer.error(error).then(),
complete: () => void observer.complete().then()
})
});
}
});
transferHandlers.set('subscription', {
canHandle: (value: unknown): value is Subscription => {
return value instanceof Subscription;
},
deserialize: (value: MessagePort) => {
return new Subscription(() => {
const remote = transferHandlers.get('proxy')!
.deserialize(value) as Remote<Subscription>;
void remote.unsubscribe().then(() => {
remote[releaseProxy]();
});
});
},
serialize: (value: Subscription) => {
return transferHandlers.get('proxy')!.serialize({
unsubscribe: () => value.unsubscribe()
});
}
});
from comlink.
That can’t be solved generically from Comlink’s side. A remote function invocation is inherently asynchronous and needs to return a promise.
My approach would be to create a subclass RemoteObservable
or something that hides this from you.
from comlink.
I think this should work:
import {
Observable,
Subscription,
PartialObserver,
from,
observable,
Subscribable
} from "rxjs";
import * as comlink from "./comlink";
interface SubscribableNoOverloads<T> {
subscribe(
...observer:
| [PartialObserver<T> | undefined]
| [
((value: T) => void) | undefined | null,
((error: any) => void) | undefined | null,
(() => void) | undefined | null
]
): Subscription;
}
// Worker
const apiWorkerSide = {
foo(): SubscribableNoOverloads<number> & comlink.ProxyValue {
return comlink.proxyValue(new Observable<number>());
}
};
comlink.expose(apiWorkerSide, self);
// Main
const proxyObservable = <T>(
proxy: comlink.ProxyResult<SubscribableNoOverloads<T>>
): Observable<T> =>
from(({
[observable]() {
return this;
},
subscribe(observer: PartialObserver<T>): Subscription {
const subscription = new Subscription();
proxy.subscribe(comlink.proxyValue(observer)).then(s => {
subscription.add(s);
});
return subscription;
}
} as any) as Subscribable<T>);
const proxy = comlink.proxy<typeof apiWorkerSide>(new Worker("worker.js"));
const api = {
foo(): SubscribableNoOverloads<number> {
return proxyObservable(proxy.foo());
}
};
from comlink.
@mikeonline Did you manage to find a solution to proxy observables using comlink 4+?
from comlink.
@felixfbecker @surma How can we do this in v4?
from comlink.
To me it seems that this can't be done without some intermediary buffering because RxJS assumes that the Observable signaling methods (next,error,complete) are synchronous.
// service-worker-api.ts
import { Observable, Subscription, Subscriber } from 'rxjs';
import * as Comlink4 from '$lib/comlink4/comlink';
export interface MinimallySubscribable<T> {
subscribe(
next: ((value: T) => void) | undefined | null,
error: ((error: any) => void) | undefined | null,
complete: (() => void) | undefined | null,
): Subscription;
}
export class ServiceWorkerAPI {
static myObservable() {
return Comlink4.proxy(
new Observable<number>((subscriber: Subscriber<number>) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
}) as MinimallySubscribable<number>,
);
}
// service-worker.ts
import * as Comlink4 from '$lib/comlink4/comlink';
import { ServiceWorkerAPI } from './service-worker-api';
// ...
worker.addEventListener('message', ({ data }) => {
if (data.port1 instanceof MessagePort) {
Comlink4.expose( ServiceWorkerAPI, data.port1);
data.port1.start();
}
});
// "page.ts"
import * as Comlink4 from '$lib/comlink4/comlink';
import type { MinimallySubscribable, ServiceWorkerAPI } from '../service-worker-api';
import {
from,
observable,
Subscription,
Observable,
type Subscribable,
type ObservableInput,
type Observer,
type Unsubscribable,
type PartialObserver,
type InteropObservable,
Subscriber,
} from 'rxjs';
// ...
const { port1, port2 } = new MessageChannel();
navigator.serviceWorker.controller.postMessage(
{ port1 },
[ port1 ],
);
const serviceWorkerAPI4 = Comlink4.wrap<typeof ServiceWorkerAPI>(port2);
const proxy = await serviceWorkerAPI4.myObservable();
const fromRemote = <T>(proxy: Comlink4.Remote<MinimallySubscribable<T>>) =>
from({
[Symbol.observable](): Subscribable<T> {
return this;
},
[observable](): Subscribable<T> {
return this;
},
subscribe(observer: Partial<Observer<T>>) {
const subscription = new Subscription();
proxy
.subscribe(
Comlink4.proxy(observer.next.bind(observer)),
Comlink4.proxy(observer.error.bind(observer)),
Comlink4.proxy(observer.complete.bind(observer)),
)
.then(s => {
// If I understand the problem correctly the Subscription gets torn down before
// Comlink has performed all the next() calls that were given to it to pass forward.
// If this check is removed, Error(s?) will arise.
if (!subscription.closed) {
subscription.add(s);
}
});
return subscription;
},
});
fromRemote(proxy).subscribe(
(value: number) => {
console.log('Received value:', value);
},
error => console.log(error),
() => {
console.log('complete');
},
);
from comlink.
我也遇到了这个问题,并实现了几个自定义
transferHandler
来传输/处理Observalbe
及其返回Subscription
的。到目前为止,似乎工作得很好。:)import { Remote, proxy, releaseProxy, transferHandlers } from 'comlink'; // v4.4.1 import { Observable, Observer, Subscribable, Subscription } from 'rxjs'; // v7.8.0 transferHandlers.set('observable', { canHandle: (value: unknown): value is Observable<unknown> => { return value instanceof Observable; }, deserialize: (value: MessagePort) => { return new Observable<unknown>((observer) => { const remote = transferHandlers.get('proxy')! .deserialize(value) as Remote<Subscribable<unknown>>; void remote.subscribe(proxy({ next: (next: unknown) => observer.next(next), error: (error: unknown) => observer.error(error), complete: () => observer.complete() })).then((subscription) => observer.add(() => { subscription.unsubscribe(); remote[releaseProxy](); })); }); }, serialize: (value: Observable<unknown>) => { return transferHandlers.get('proxy')!.serialize({ subscribe: (observer: Remote<Observer<unknown>>) => value.subscribe({ next: (next: unknown) => void observer.next(next).then(), error: (error: unknown) => void observer.error(error).then(), complete: () => void observer.complete().then() }) }); } }); transferHandlers.set('subscription', { canHandle: (value: unknown): value is Subscription => { return value instanceof Subscription; }, deserialize: (value: MessagePort) => { return new Subscription(() => { const remote = transferHandlers.get('proxy')! .deserialize(value) as Remote<Subscription>; void remote.unsubscribe().then(() => { remote[releaseProxy](); }); }); }, serialize: (value: Subscription) => { return transferHandlers.get('proxy')!.serialize({ unsubscribe: () => value.unsubscribe() }); } });
How should I use it? I don’t use rxjs, I just implemented the observer pattern simply by myself. Thank you.
main.js
async function query() {
(await db.messageDao.queryMessages()).subscribe(
Comlink.proxy({
next(value) {
console.log(value);
}
})
);
}
worker.js
observable.next && observable.next(result);
from comlink.
Related Issues (20)
- Web Worker Hangs if Imported File Contains Top-Level `Await` HOT 6
- Service worker example: stops working when SW is suspended HOT 9
- Set operation is not awaitable HOT 2
- Possible to communicate between web workers? HOT 1
- How to transfer result buffers HOT 2
- [feat] DeasyncEndpoint HOT 2
- Move from Karma to Playwright? HOT 1
- Do I need to use transfer inside a proxy? HOT 1
- Add support for async transferHandle serializer/deserializer HOT 2
- Significant performance optimizations possible in `requestResponseMessage` HOT 4
- Push notifications from web worker HOT 5
- Memory leak when terminating worker with ongoing call HOT 2
- Worker Pool for similar tasks HOT 2
- Sharing constructed objects within a remote context HOT 1
- How to implement comlink in Next.js 14 App Route HOT 2
- Feature request: Automatic proxy of arguments HOT 3
- Having issues when using comlink with react (storing in react state) HOT 3
- Exposed function deadlocks if called too early
- Returning a collection - transferHandlers to be called
- How to prevent auto `releaseProxy`?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from comlink.