RxPY3: Creating a rolling buffer using buffer_with_count


👉 I created a github-repository with this and other rxpy (version 3) examples. You can find it at rxpy3-examples on github.

When I first looked at the different buffer-operators, I was very surprised to find that none of seemed to provide a simple rolling (aka sliding) buffer. This seemed like something that is so basic that it should be part of the core library.

It turns out, I was just not reading the documentation close enough: buffer_with_count(count=buffer_size, skip=1) will give you a rolling buffer of size buffer_size.

Example using buffer_with_count

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from time import sleep

from rx import operators, interval


def sliding_buffer_example(buffer_size: int):

    source = interval(0.3).pipe(
        operators.do_action(
            on_next=lambda single_item: print(f"action before buffering {single_item}"),
        ),
        operators.buffer_with_count(buffer_size, 1),
        operators.do_action(
            on_next=lambda buffered_items: print(f"action on buffer {buffered_items}"),
        ),
    )
    print("Subscribing to source")
    source.subscribe()
    sleep(2)


if __name__ == "__main__":
    sliding_buffer_example(3)

Output

Note how the source must first emit buffer-size-number of items (three in this example) before the first buffer is emitted, i.e. only “full” buffers are emitted:

Subscribing to source
action before buffering 0
action before buffering 1
action before buffering 2
action on buffer [0, 1, 2]  <--- first buffer
action before buffering 3
action on buffer [1, 2, 3]
action before buffering 4
action on buffer [2, 3, 4]
action before buffering 5
action on buffer [3, 4, 5]

See also