Why does RxPY's TwistedScheduler raise AlreadyCalled error?

296 views Asked by At

Minimal working example -- a client that sends 'Hello world' to echo server word-by-word using an RxPY (v.1.2.4) Observable.

Client:

from rx.concurrency.mainloopscheduler import TwistedScheduler
from rx.observable import Observable
from twisted.internet import reactor
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver

class RxProtocol(LineReceiver):
    def connectionMade(self):
        Observable.from_list('Hello world'.split(' '), TwistedScheduler(reactor)).subscribe(self.sendLine)

    def lineReceived(self, line):
        print line

if __name__ == '__main__':
    f = ClientFactory()
    f.protocol = RxProtocol
    reactor.connectTCP("localhost", 8000, f)
    reactor.run()

Server: simple echo server from twisted examples -- echoserv.py

With the server running, starting the client gives:

C:\Program Files\Python\2.7\python.exe C:/client.py
Unhandled Error
Traceback (most recent call last):
  File "C:/client.py", line 16, in <module>
    reactor.run()
  File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 1194, in run
    self.mainLoop()
  File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 1203, in mainLoop
    self.runUntilCurrent()
--- <exception caught here> ---
  File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 825, in runUntilCurrent
    call.func(*call.args, **call.kw)
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\mainloopscheduler\twistedscheduler.py", line 36, in interval
    disposable.disposable = action(scheduler, state)
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\scheduler.py", line 72, in schedule_work
    group.remove(d)
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\compositedisposable.py", line 46, in remove
    item.dispose()
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\compositedisposable.py", line 63, in dispose
    disposable.dispose()
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\disposable.py", line 34, in dispose
    self.action()
  File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\mainloopscheduler\twistedscheduler.py", line 42, in dispose
    handle[0].cancel()
  File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 89, in cancel
    raise error.AlreadyCalled
twisted.internet.error.AlreadyCalled: Tried to cancel an already-called event.

What am I doing wrong?

0

There are 0 answers