I created a github-repository with the two rxpy (version 3) examples from this and another previous blog-post. You can find it at rxpy3-examples on github.
My previous example of the rxpy3 buffer-operator demonstrated the basic usage of the buffer-operator by using two interval
-observables. In today’s example I will create a custom observable. While this particular observable is very simple, it demonstrates how to create an observable from a non-reactive data-source.
The relevant part you would need to adapt are these:
next_item += 1 <-- replace with your own code that get's a
next item from a queue, database, file whatever
observer.on_next(next_item) <-- keep
sleep(0.3) <-- remove
The example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| from time import sleep
from typing import Optional
from rx import operators, interval, create
from rx.core.typing import Observer, Scheduler, Disposable
def multiple_observers_example():
def dequeue(observer: Observer, scheduler: Optional[Scheduler]) -> Disposable:
next_item = 0
while True:
try:
next_item += 1
observer.on_next(next_item)
sleep(0.3)
except Exception as e:
observer.on_error(e)
return Disposable()
# This is the "main" observable. Note that we use "pipe(operators.publish())" here
observable = create(dequeue).pipe(operators.publish())
# A second observable that receives its input from the main one
skip4 = observable.pipe(operators.filter(lambda x: x % 3 == 0))
skip4.subscribe(lambda x: print(f"skipped to {x}"))
# A third observable that also receives its input from the main one. In addition, this uses the second
# observable to trigger the buffering
buffer = observable.pipe(operators.buffer(skip4))
buffer.subscribe(lambda x: print(f"processing buffered events {x}"))
# Because the main observable uses the publish-operator, we need to call connect
print("Connecting")
observable.connect()
while True:
sleep(10)
if __name__ == "__main__":
multiple_observers_example()
|
The output
Connecting
skipped to 3
processing buffered events [1, 2, 3]
skipped to 6
processing buffered events [4, 5, 6]
skipped to 9
processing buffered events [7, 8, 9]
skipped to 12
processing buffered events [10, 11, 12]
skipped to 15
processing buffered events [13, 14, 15]
...
See also
If you liked this post, please, do share it:
Thanks, for reading (and sharing)! 🥳