In ThreadPoolExecutor, socket.bind() is blocking without "Address already in use"

27 views Asked by At

I'm writing a Python program using socket and tested listening the same port twice, using ThreadPoolExecutor. This is a minimal program that reproduces my problem:

import socket
import time
import concurrent.futures

def listen6666():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    print("set socket")
    server_socket.bind(("localhost", 6666))
    server_socket.listen()
    print("done")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", 6666))
server_socket.listen()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    fn = executor.submit(listen6666)
    fn2 = executor.submit(listen6666)
    # listen6666() - "Address already in use"
#fn.result() - "Address already in use"
#fn2.result() - "Address already in use"
time.sleep(10)

I expect "Address already in use" occurs in server_socket.bind in listen6666, but it just don't respond. So the complete output is:

$ python test.py 
set socket
set socket
$

If I call listen6666() without executor or call fn.result() before sleeping, "Address already in use" does occur. I want "Address already in use" also happen in listen6666().

How can I solve this?

1

There are 1 answers

1
SIGHUP On BEST ANSWER

You need to manage exceptions in the thread.

Here's a revised version of your code that shows that an "Address already in use" exception is raised.

import socket
import concurrent.futures

ADDRESS = "localhost", 6666

def listen6666(keep=False):
    server_socket = None
    try:
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        print("bind")
        server_socket.bind(ADDRESS)
        print("listen")
        server_socket.listen()
        print("done")
    except Exception as e:
        print(e)
    finally:
        if keep:
            return server_socket
        if server_socket is not None:
            server_socket.close()

server_socket = None
try:
    server_socket = listen6666(True)

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.submit(listen6666)
        executor.submit(listen6666)
except Exception as e:
    print(e)
finally:
    if server_socket is not None:
        server_socket.close()

Output:

bind
listen
done
bind
bind
[Errno 48] Address already in use
[Errno 48] Address already in use