Wie konvertiert man Callback-basierte API auf Observable?

Die Bibliothek, die ich verwende, sendet eine Reihe von Message Objekten mit Callback-Objekt.

 interface MessageCallback { onMessage(Message message); } 

Der callback wird mithilfe eines libraryObject.setCallback(MessageCallback) hinzugefügt, und der process wird mit dem nicht blockierenden libraryObject.start() gestartet.

Was ist der beste Weg, eine beobachtbare Observable erstellen, die diese Objekte ausstrahlt?

Was libraryObject.start() wenn das libraryObject.start() blockiert?

Ich denke du brauchst so etwas (Beispiel in Scala)

 import rx.lang.scala.{Observable, Subscriber} case class Message(message: String) trait MessageCallback { def onMessage(message: Message) } object LibraryObject { def setCallback(callback: MessageCallback): Unit = { ??? } def removeCallback(callback: MessageCallback): Unit = { ??? } def start(): Unit = { ??? } } def messagesSource: Observable[Message] = Observable((subscriber: Subscriber[Message]) ⇒ { val callback = new MessageCallback { def onMessage(message: Message) { subscriber.onNext(message) } } LibraryObject.setCallback(callback) subscriber.add { LibraryObject.removeCallback(callback) } }) 

Wie beim blocking / non-blocking start() : In der Regel trennt Callback-basierte Architektur das Callback-Abonnement und den processstart. In diesem Fall können Sie beliebig viele messageSource s erstellen, unabhängig davon, wann Sie den process start() . Auch die Entscheidung, ob Sie es abzweigen oder nicht, liegt ganz bei Ihnen. Unterscheidet sich Ihre Architektur davon?

Sie sollten auch den process irgendwie beenden. Am besten fügen onCompleted der MessageCallback-Schnittstelle einen onCompleted Handler hinzu. Wenn Sie Fehler behandeln möchten, fügen Sie auch einen onError Handler hinzu. Nun, siehe da, du hast gerade den Grundbaustein von RxJava, einem Beobachter, erklärt 🙂

1. Callback wird unendlich oft aufgerufen

Wir können es wie folgt in Observable konvertieren (Beispiel für RxJava 2):

 Observable source = Observable.create(emitter -> { MessageCallback callback = message -> emitter.onNext(message); libraryObject.setCallback(callback); Schedulers.io().scheduleDirect(libraryObject::start); emitter.setCancellable(() -> libraryObject.removeCallback(callback)); }) .share(); // make it hot 

share macht diese Observable heiß , dh mehrere Subscriber teilen sich ein einzelnes Subscribe, dh bei libraryObject wird höchstens ein Callback registriert.

Ich benutzte io Scheduler, um zu planen, dass der Anfangsanruf vom Hintergrundthread gemacht wird, also verzögert es nicht die erste Subskription.

2. Einzelner Nachrichtenrückruf

Es ist ein ziemlich häufiges Szenario. Nehmen wir an, wir haben die folgende asynchrone Callback-Methode:

 libraryObject.requestDataAsync(Some parameters, MessageCallback callback); 

Dann können wir es wie folgt in Observable konvertieren (Beispiel für RxJava 2):

 Observable makeRequest(parameters) { return Observable.create(emitter -> { libraryObject.requestDataAsync(parameters, message -> { emitter.onNext(message); emitter.onComplete(); }); }); }