Communicating Between Threads with Queue().put() and Queue().get()

32 views Asked by At

EDIT: Made some changes to the code which answered some of my previously asked questions, but now I have new questions

I am very new to threads and parallel programming so forgive me if I did things unconventionally.

I have two threads, and I am trying to get them to communicate to one another. One of them runs shorter than the other so what I am trying to have is that when the shorter one ends, the longer one will also end. I had partial success with this by passing and scanning a certain object (here I use sentinel) to mark the completion of either of the threads, but I have several questions regarding this. Here is the sample code I am running:

from threading import Thread
import time
from queue import Queue

sentinel = object()
q = Queue()
q.put(0)

def say_hello(subject, q):
    print("starting hello")
    for i in range(5):
        time.sleep(2)
        print(f"\nhello {subject}! iter:{i}")

        data = q.get()
        if data is sentinel:
            break
        else:
            q.put(data)
            print(f"from hello: {q.queue}")    
        
    if i == 4:
        print("hello finished because of iter")
        q.put(sentinel)
    else:
        print("hello finished because of sentinel from foo")
        q.put(sentinel)


def foo(q):
    print("starting foo")
    for j in range(2):
        time.sleep(1)
        print(f"\nfoo iter:{j}")

        data = q.get()
        if data is sentinel:
            break
        else: 
            q.put(data)
            print(f"from foo: {q.queue}")

    if j == 1:
        print("foo finished because of iter") 
        q.put(sentinel)   
    else:
        print("foo finished because of sentinel from hello")
        q.put(sentinel)


def t():
    print("t:0\n")
    for k in range(1,6):
        time.sleep(1)
        print(f"\nt:{k}")

def run():
    time_thread = Thread(target = t)
    time_thread.start()

    hello_thread = Thread(target = say_hello, args = ["lem", q])
    hello_thread.start()

    foo_thread = Thread(target = foo, args = [q])
    foo_thread.start()

    time_thread.join()
    hello_thread.join()
    foo_thread.join()

    print("Done")

run()

In this case, foo should end first and say_hello will end following that, and that is what is seen from the output.

here is the output:

t:0
starting hello

starting foo

t:1

foo iter:0
from foo: deque([0])

hello lem! iter:0
t:2

foo iter:1
from foo: deque([0])
foo finished because of iter

from hello: deque([<object object at 0x000001B9CF908EA0>, 0])

t:3

t:4
hello lem! iter:1
hello finished because of sentinel from foo


t:5
Done

Now my question is:

  1. Is this the right way of doing this? Is there perhaps an easier, cleaner, more conventional way of performing the same thing?
  2. The output seems a little erratic such that they change ever so slightly every time I rerun it
  3. Are local variables of identical names shared between threads? I was running all my loops with i before and it almost seems like it affects all the threads
  4. It seems like say_hello doesn't quite run at the timestamps I want it to be. At t:1 there shouldn't be any output from say_hello but there is. This is even more peculiar seeing that from hello: deque... is only printed after foo's part did. I always suspected that they should print in pairs like how foo always does
  5. Instead of including a 0 at the beginning in the queue, is there a way I could just let the two threads look into the queue using .get_nowait(), see if there is anything, if not then just skip it without the system raising an error?

Any and all input is greatly appreciated. Thanks!

1

There are 1 answers

2
Solomon Slow On BEST ANSWER

Is this the right way of doing this?

I would ask instead, "Is this a right thing to do?" You are forcing the threads to take turns. The 0 that your main thread puts into the queue effectively is a talking stick. The "hello" thread and the "foo" thread pass it back and forth, and neither of them is allowed to do anything interesting unless they are holding it. Meanwhile, your main thread is not allowed to do anything until both of the other two threads have finished.

Your program never allows more than one thread to do anything interesting at the same time. You might just as well do everything in one thread. The code would be simpler (fewer things that can go wrong) and if efficiency mattered, it's more efficient.

One reason why you might want to use that "talking stick" architecture is if the two threads were behaving as coroutines, where the progress of one is tied to the other, but their internal states progress differently from each other. Coroutines can be easier to read than some other alternative representations because the state of any one coroutine is implicit in its execution context instead of being explicitly encoded in data objects. It's more like how we were taught to understand single-threaded programs when we were beginners.

That being said though, I don't see that kind of "unsynchronized" dependency in your code. Your two routines simply march along in lock-step with each other until one of them tells the other, "I'm Done!"

The output seems a little erratic such that they change ever so slightly every time I rerun it

When two threads are allowed to run concurrently with each other (which, yours do a little bit) then there is no guarantee about the precise order in which they perform their various actions.

Are local variables of identical names shared between threads? I was running all my loops with i before and it almost seems like it affects all the threads ... I did this in Jupyter Notebook.

The local variables for any given call to a Python function effectively are different variables from the local variables in any other call to the same function.

But, are you so sure that they really are local variables? I don't know anything about Jupyter Notebook, but it sounds like you're using a REPL. If you create a top-level variable named i in the REPL environment, and then you call a function that refers to i as a free variable, then i is not local in that function call. The function will be using the top-level variable.

Python 3.9.6 (default, May  7 2023, 23:32:44) 
...
>>> i = 5
>>> def foo():
...     print(i)
... 
>>> foo()
5
>>> 

If your function assigns i before it uses i, then that should make i a local for each function call, but like I said, I don't know Jupyter, so I am just asking the question, "are you so sure that they really are local?"

It seems like say_hello doesn't quite run at the timestamps I want it to be.

I'm not going to look any deeper into this thing, but up above, I lied a little. Your "hello" thread and your "foo" thread both actually do a few things after giving the "talking stick" back to the other thread. That means they are running concurrently for part of the time, and like I said above, there are no guarantees about the order in which concurrent threads do things.

Instead of including a 0 at the beginning in the queue, is there a way I could just let the two threads look into the queue using .get_nowait()...?

That's called polling—continually checking for some external event instead of making a system call that "sleeps" until the event happens. It's a useful technique in some situations, but it usually is frowned upon in application code that shares a desktop, server, or mobile computing platforms with other applications because it either burns CPU cycles, and uses excess electric power (if you don't pad the polling loop with blind sleep calls) or else it is insufficiently responsive (if you do use sleep calls.)