RxPY: How to use the buffer-operator


I created a github-repository with the two rxpy (version 3) examples from this and another previous blog-post. You can find it at rxpy3-examples on github.

When I was trying to understand and use the buffer operator of RxPy3, I noticed that there are not a lot of code samples out there. The few examples are usually for version 2 of RxPy, and that does not help if you are a complete RxPy3 n00b, like me.

The official documentation was no help to me with their one line example: >>> res = buffer(rx.interval(1.0)). I am not that smart, I need a working example with some context. So I created one.

What does the buffer-operator do?

Quoting the docs: The buffer-operator projects each element of an observable sequence into zero or more buffers. Taking a look at the parameters bounderies makes it more clear: “boundaries (Observable) – Observable sequence whose elements denote the creation and completion of buffers.”

Ok. It would phrase it like this: Using the buffer-operator, you can delay the output of one stream (let’s call it source) until a second stream emits something. Let’s call the second stream the buffer-marker. Whenever the buffer-marker emits something, all items from the source-stream that have occurred until the last buffer-marker event are now emitted as a list of items.

An example

The example uses two different inverval() observables to simulate the two streams of events/data. The first one, buffer_marker emits something every second, while the second one, source, is a little faster and emits something every 0.3 seconds. (The something that is emitted is a number in this case, because that’s what interval does, but it could be anything.)

The sample script

Note that buffer_marker.subscribe() is never explicitly called in this example. It happens implicitly, because we are using it with operator.buffer(buffer_marker).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# file: rx3_buffer_example.py
from time import sleep

from rx import operators, interval


def buffer_example():
    buffer_marker = interval(1.0).pipe(
        operators.do_action(
            on_next=lambda x: print(f"buffer marker {x}"),
        ),
    )

    source = interval(0.3).pipe(
        operators.do_action(
            on_next=lambda single_item: print(f"action before buffering {single_item}"),
        ),
        operators.buffer(buffer_marker),
        operators.do_action(
            on_next=lambda buffered_items: print(
                f"action after buffering {buffered_items}"
            ),
        ),
    )
    print("Subscribing to source")
    source.subscribe()
    while True:
        sleep(5)


if __name__ == "__main__":
    buffer_example()

Output

$ python3 -m rx3_buffer_example
Subscribing to source
action before buffering 0
action before buffering 1
action before buffering 2
buffer marker 0
action after buffering [0, 1, 2]
action before buffering 3
action before buffering 4
action before buffering 5
buffer marker 1
action after buffering [3, 4, 5]
action before buffering 6
action before buffering 7
...

See also