👉 I created a github-repository with this and other rxpy (version 3) examples. You can find it at rxpy3-examples on github.
When I first looked at the different
buffer-operators, I was very surprised to find that none of seemed to provide a simple rolling (aka sliding) buffer. This seemed like something that is so basic that it should be part of the core library.
It turns out, I was just not reading the documentation close enough:
buffer_with_count(count=buffer_size, skip=1) will give you a rolling buffer of size
Note how the source must first emit
buffer-size-number of items (three in this example) before the first buffer is emitted, i.e. only “full” buffers are emitted:
Subscribing to source action before buffering 0 action before buffering 1 action before buffering 2 action on buffer [0, 1, 2] <--- first buffer action before buffering 3 action on buffer [1, 2, 3] action before buffering 4 action on buffer [2, 3, 4] action before buffering 5 action on buffer [3, 4, 5]