# RxPY3: Using multiple observers with the buffer-operator

I created a github-repository with the two rxpy (version 3) examples from this and another previous blog-post. You can find it at rxpy3-examples on github.

My previous example of the rxpy3 buffer-operator demonstrated the basic usage of the buffer-operator by using two interval-observables. In today’s example I will create a custom observable. While this particular observable is very simple, it demonstrates how to create an observable from a non-reactive data-source.

The relevant part you would need to adapt are these:

    next_item += 1                  <-- replace with your own code that get's a
next item from a queue, database, file whatever
observer.on_next(next_item)     <-- keep
sleep(0.3)                      <-- remove


## The example

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41  from time import sleep from typing import Optional from rx import operators, interval, create from rx.core.typing import Observer, Scheduler, Disposable def multiple_observers_example(): def dequeue(observer: Observer, scheduler: Optional[Scheduler]) -> Disposable: next_item = 0 while True: try: next_item += 1 observer.on_next(next_item) sleep(0.3) except Exception as e: observer.on_error(e) return Disposable() # This is the "main" observable. Note that we use "pipe(operators.publish())" here observable = create(dequeue).pipe(operators.publish()) # A second observable that receives its input from the main one skip4 = observable.pipe(operators.filter(lambda x: x % 3 == 0)) skip4.subscribe(lambda x: print(f"skipped to {x}")) # A third observable that also receives its input from the main one. In addition, this uses the second # observable to trigger the buffering buffer = observable.pipe(operators.buffer(skip4)) buffer.subscribe(lambda x: print(f"processing buffered events {x}")) # Because the main observable uses the publish-operator, we need to call connect print("Connecting") observable.connect() while True: sleep(10) if __name__ == "__main__": multiple_observers_example()

## The output

Connecting
skipped to 3
processing buffered events [1, 2, 3]
skipped to 6
processing buffered events [4, 5, 6]
skipped to 9
processing buffered events [7, 8, 9]
skipped to 12
processing buffered events [10, 11, 12]
skipped to 15
processing buffered events [13, 14, 15]
...