# RxPY: How to use the buffer-operator

I created a github-repository with the two rxpy (version 3) examples from this and another previous blog-post. You can find it at rxpy3-examples on github.

When I was trying to understand and use the buffer operator of RxPy3, I noticed that there are not a lot of code samples out there. The few examples are usually for version 2 of RxPy, and that does not help if you are a complete RxPy3 n00b, like me.

The official documentation was no help to me with their one line example: >>> res = buffer(rx.interval(1.0)). I am not that smart, I need a working example with some context. So I created one.

## What does the buffer-operator do?

Quoting the docs: The buffer-operator projects each element of an observable sequence into zero or more buffers. Taking a look at the parameters bounderies makes it more clear: “boundaries (Observable) – Observable sequence whose elements denote the creation and completion of buffers.”

Ok. It would phrase it like this: Using the buffer-operator, you can delay the output of one stream (let’s call it source) until a second stream emits something. Let’s call the second stream the buffer-marker. Whenever the buffer-marker emits something, all items from the source-stream that have occurred until the last buffer-marker event are now emitted as a list of items.

## An example

The example uses two different inverval() observables to simulate the two streams of events/data. The first one, buffer_marker emits something every second, while the second one, source, is a little faster and emits something every 0.3 seconds. (The something that is emitted is a number in this case, because that’s what interval does, but it could be anything.)

## The sample script

Note that buffer_marker.subscribe() is never explicitly called in this example. It happens implicitly, because we are using it with operator.buffer(buffer_marker).

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32  # file: rx3_buffer_example.py from time import sleep from rx import operators, interval def buffer_example(): buffer_marker = interval(1.0).pipe( operators.do_action( on_next=lambda x: print(f"buffer marker {x}"), ), ) source = interval(0.3).pipe( operators.do_action( on_next=lambda single_item: print(f"action before buffering {single_item}"), ), operators.buffer(buffer_marker), operators.do_action( on_next=lambda buffered_items: print( f"action after buffering {buffered_items}" ), ), ) print("Subscribing to source") source.subscribe() while True: sleep(5) if __name__ == "__main__": buffer_example()

## Output

\$ python3 -m rx3_buffer_example
Subscribing to source
action before buffering 0
action before buffering 1
action before buffering 2
buffer marker 0
action after buffering [0, 1, 2]
action before buffering 3
action before buffering 4
action before buffering 5
buffer marker 1
action after buffering [3, 4, 5]
action before buffering 6
action before buffering 7
...