Minimal working example -- a client that sends 'Hello world'
to echo server word-by-word using an RxPY (v.1.2.4) Observable
.
Client:
from rx.concurrency.mainloopscheduler import TwistedScheduler
from rx.observable import Observable
from twisted.internet import reactor
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
class RxProtocol(LineReceiver):
def connectionMade(self):
Observable.from_list('Hello world'.split(' '), TwistedScheduler(reactor)).subscribe(self.sendLine)
def lineReceived(self, line):
print line
if __name__ == '__main__':
f = ClientFactory()
f.protocol = RxProtocol
reactor.connectTCP("localhost", 8000, f)
reactor.run()
Server: simple echo server from twisted examples -- echoserv.py
With the server running, starting the client gives:
C:\Program Files\Python\2.7\python.exe C:/client.py
Unhandled Error
Traceback (most recent call last):
File "C:/client.py", line 16, in <module>
reactor.run()
File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 1194, in run
self.mainLoop()
File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 1203, in mainLoop
self.runUntilCurrent()
--- <exception caught here> ---
File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 825, in runUntilCurrent
call.func(*call.args, **call.kw)
File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\mainloopscheduler\twistedscheduler.py", line 36, in interval
disposable.disposable = action(scheduler, state)
File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\scheduler.py", line 72, in schedule_work
group.remove(d)
File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\compositedisposable.py", line 46, in remove
item.dispose()
File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\compositedisposable.py", line 63, in dispose
disposable.dispose()
File "C:\Program Files\Python\2.7\lib\site-packages\rx\disposables\disposable.py", line 34, in dispose
self.action()
File "C:\Program Files\Python\2.7\lib\site-packages\rx\concurrency\mainloopscheduler\twistedscheduler.py", line 42, in dispose
handle[0].cancel()
File "C:\Program Files\Python\2.7\lib\site-packages\twisted\internet\base.py", line 89, in cancel
raise error.AlreadyCalled
twisted.internet.error.AlreadyCalled: Tried to cancel an already-called event.
What am I doing wrong?