As I was adding a test for the
examples/rx3_buffer_example.py I noticed that the endless while-loop was actually never called, because
observable.connect() seems to be a blocking operation, until the stream finishes with
To make this example testable, I changed the
dequeue() from using an endless while-loop to emitting a list of items. The changes can bee seen in this commit.
In addition, I made a small change to the
capture_stdout_as_list context-manager: I renamed the
__str__(), so that the context-manager’s result can be used in the test assertions using
str(result) instead of
result.to_string(). It does look a little nicer, and I guess that this could be called “more pythonic”.