RxPY3: Using multiple observers with the buffer-operator


I created a github-repository with the two rxpy (version 3) examples from this and another previous blog-post. You can find it at rxpy3-examples on github.

My previous example of the rxpy3 buffer-operator demonstrated the basic usage of the buffer-operator by using two interval-observables. In today’s example I will create a custom observable. While this particular observable is very simple, it demonstrates how to create an observable from a non-reactive data-source.

The relevant part you would need to adapt are these:

    next_item += 1                  <-- replace with your own code that get's a 
                                        next item from a queue, database, file whatever
    observer.on_next(next_item)     <-- keep
    sleep(0.3)                      <-- remove

The example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from time import sleep
from typing import Optional

from rx import operators, interval, create
from rx.core.typing import Observer, Scheduler, Disposable


def multiple_observers_example():
    def dequeue(observer: Observer, scheduler: Optional[Scheduler]) -> Disposable:
        next_item = 0
        while True:
            try:
                next_item += 1
                observer.on_next(next_item)
                sleep(0.3)
            except Exception as e:
                observer.on_error(e)
                return Disposable()
    
    # This is the "main" observable. Note that we use "pipe(operators.publish())" here 
    observable = create(dequeue).pipe(operators.publish())

    # A second observable that receives its input from the main one
    skip4 = observable.pipe(operators.filter(lambda x: x % 3 == 0))
    skip4.subscribe(lambda x: print(f"skipped to {x}"))

    # A third observable that also receives its input from the main one. In addition, this uses the second
    # observable to trigger the buffering
    buffer = observable.pipe(operators.buffer(skip4))
    buffer.subscribe(lambda x: print(f"processing buffered events {x}"))

    # Because the main observable uses the publish-operator, we need to call connect
    print("Connecting")
    observable.connect()

    while True:
        sleep(10)


if __name__ == "__main__":
    multiple_observers_example()

The output

Connecting
skipped to 3
processing buffered events [1, 2, 3]
skipped to 6
processing buffered events [4, 5, 6]
skipped to 9
processing buffered events [7, 8, 9]
skipped to 12
processing buffered events [10, 11, 12]
skipped to 15
processing buffered events [13, 14, 15]
...

See also