Concurrency In Python
Page Contents
References
- Concurrent Execution, Python Standard Library Documentation.
- Threading – Manage concurrent threads, PyMOTW.
- Learning Resources · crossbeam-rs/rfcs Wiki
An Introduction To Concurrency
When a program can interleave statements from multiple workflows we can say the program is concurrent. This can be done via processes and threads. The main difference between the two is that a process has its own address space and cannot access the memory of other processes directly. A thread, on the otherhand, runs within a process. It can access the entire address space of the process it belongs to.
Here I will mostly talk about threads...
Correctness Properties
Safety. We want our programs to be safe so that having multiple threads try to access shared resources etc does not cause errors and that all our threads can run continuously:
- Mutual exclusion - critical sections must not overlap.
- Deadlock - occurs when no process can make usefull progress.
- Deadly embrace - progress is mutually dependent, so for example, process A is waiting on a resource held by process B, but process B is waiting on a different resource held by process A.
- Livelock - every process is computing without progressing meaningfully.
Liveness. We want our programs to produce something useful! I.e, each process is allocated the resources it needs to do its job:
- Starvation - occurs when some requests never get dealt with.
- Fairness - contention for resource is resolved fairly
What Is A "Critical Region"
A critical region is a section of code which must only be executed by one thread at any one time. Lets take, for example, a shared counter. Let's say that there are N threads, each of which increments the counter when some event occurs. The pseudo code that increments the counter is:
var = counter value var = var + 1 counter value = var
Without any protection it is quite possible to get this interleaving:
Counter value is 5 Thread A: var = 5 Thread B: var = 5 Thread B: var = var + 1 = 6 Thread B: counter value = 6 Thread A: var = var + 1 = 6 Thread A: counter value = 6 Counter value is 6 (oops!)
The value of the counter at the end of this sequence is wrong. It should have been incremented twice, once by thread A and once by thread B, giving it a new value of 7. As it stands it ends up being 6!
Clearly from the point just before a thread reads the counter to just after it stores the incremented value, a thread must not be interrupted by another thread that is also trying to increment the counter. If we make our pseudo code this:
ENTER CRITICAL REGION var = counter value var = var + 1 counter value = var LEAVE CRITICAL REGION
The the malfunctioning interleaving we saw above cannot happed because we have placed this "guard" around the access to the counter. This guard stops more than 1 thread being in this section of code at once. Thus, our previous sequence would now look like this:
Counter value is 5 Thread A: Enters critical region Thread B: Tries to enter critical region but cannot. Blocks! Thread A: var = var + 1 = 6 Thread A: counter value = 6 Thread A: Leaves critical region Thread B: Unblocks and enters critical region Thread B: var = 6 Thread B: var = var + 1 = 7 Thread B: counter value = 7 Thread B: Leaves critical region Counter value is 7 (yay!)
So, how do we define critical regions in Pyhon? We use Locks, Recursive Locks, Semaphores, and Condition Objects, which we will come to later....
The Advantages Of Threads
So, why use threads? Because sometimes programs are waiting doing nothing, so by having more threads, whilst one thread is waiting on say an internet query, the other thread can be getting on doing useful work. Esessntially you can get more work done. It can be more efficient to have multiple threads in one process rather than many processes.
So... to give myself some structure writing this and to revise a little I looked back at my uni notes which were for Java and made almost 15 years ago now! The principles haven't changed and rather than always looking up the docs and coding as I go I though writing this would cement it in my head for Python. So here goes...
Create A Thread
The Python Standard Library provides the class Thread
, which can either have a callable
passed into the constructor or the class itself can be subclassed (only override __init__()
and run()
.
Passing A Callable To The Thead Constructor
Create a thread by passing a callable (usually a functionm) to the Thread
constructor as
demonstrated below.
from threading import Thread def my_thread(arg1): print("Have arg1 = {}".format(arg1)) for i in range(10): print(i) threads = [] for i in range(5): thread = Thread( target = my_thread, name = "my_thread_{}".format(i), args=(5-i,)) threads.append(thread) thread.start() for thread in threads: print( "Waiting for thread {} with name '{}'".format( thread.ident, thread.name)) thread.join() # Block until this thread has finished
The above example starts 5 threads, which don't do anything particularly interesting except print out the argument they were passed and then a sequence of 10 numbers. The output should be enough to convince you that the seperate flows of execution are running concurrently however.
To Thread
we can pass an args
option which has to be a tuple. Each member
of that tuple is passed as an argument to the callable my_thread
. The Thread
is told to run the function my_thread
through the use of the target
keyword.
The thread's name
keyword argument isn't used by the thread, but it can be retrieved
using the thread.name
property. You can set it to anything you like and it does not
have to be unique.
Once each thread is created it is immediately started using thread.start()
. As soon
as this call completes the thread is active and can be scheduled at any time.
Once a thread is active it is given a unqiue identified, which we print out in the "Waiting for thread..."
message using the property thread.ident
. Note, that although the identified is unique, as soon as a thread dies it's ident
may be reused by new threads.
Subclassing Thread
If you subclass Thread
you must override the run()
method
and optionally the constructor.
from threading import Thread class MyThread(Thread): def __init__(self, arg1): super(MyThread, self).__init__() self._arg1 = arg1 def run(self): print("Have arg1 = {}".format(self._arg1)) for i in range(10): print(i) threads = [] for i in range(5): thread = MyThread(5-i) threads.append(thread) thread.start() for thread in threads: print( "Waiting for thread {} with name '{}'".format( thread.ident, thread.name)) thread.join() # Block until this thread has finished
I dont think there is much difference between giving Thread
a target
and just sublassing it. The later just has a more object oriented feel about it I guess.
Waiting For A Thread To End
In the above examples you will have seen the command thread.join()
. The join()
method is part of the Thread
object and causes the calling thread of execution to block
until the thread has terminated.
Create A Semaphore
The following snippet creates a semaphore with an initial value of 2. This means that up to 2 threads can be in the critical section that this semaphore can establish at any one time. If a 3rd thread were to try and enter it would block until 1 or more of the existing threads had left the CR.
num_resources = 2 my_sem = threading.Semaphore(num_resources)
To use the semaphore to define a critical region you can do this:
with my_sem: # do something critical # get to here and you're outside the with block and therefore # no longer in the critical region
This is a nice and safe way of using a semaphore. At the start of the with
block, the semaphore will have its acquire()
function called automatically
for you. Then the code in the with
block is executed. When the with
block is exited, for any reason, be it exception or otherwise, the semaphore will have
its release()
method called.
You can, of course, call the acquire()
and release()
methods
yourself. You might do it like this:
my_sem.aqcuire() try: # do some work finally: my_sem.release()
Notice that we have used a try
block to ensure that no matter
what happens, the semaphore will be released, whether you return from
within the try
block or throw an exception etc.
Sometimes you will write things like this:
my_sem.acquire() if some condition: my_sem.release() return some error # do critical work my_sem.release()
This is quite error prone because you have to remember to release the
semaphore wherever you end the normal flow of execution. You are much
much much better off using a with
block, or failing that
the try/finally
block structures shown above.
Create A (Recursive) Lock
A lock is like a sempahore with an initial value of 1. It is effectively a mutex. It allows one and only one thread access to the resource or critical section that it defines.
You create a lock as follows:
my_lock = threading.Lock() #< Created unlocked
Just like the semaphore the lock has methods acquire()
and release()
.
The lock can also be used with the with
block in the same way.
A thread cannot re-take a lock that it has already aquired. If it tries to it will block:
my_lock.acquire() my_lock.acquire() #< This WILL block!
If you want a thread to be able to take a lock and then, take it again before releasing
it - this is called recursive locking then you need to use an RLock
:
my_rlock = threading.RLock() my_rlock.acquire() my_rlock.acquire() #< Will NOT block
The Bounded Buffer Problem: Asynchronous Message Passing
In this scenario there is a thread that creates a load of data. It could be a thread on your server, accepting client connections, reading data from the client and then passing the the received data on to another thread in your application that will process the data.
The thread sending the data that is getting pumped into your program's analysis algorithm is called the producer. The algorithm thread is said be be the consumer because it is consuming the data.
If we didn't have a queue in between these two threads they would have to operate in lock-step. The producer would get data over the network. It would then have to wait for the analysis (consumer) thread to be available so it could pass the data through to it. If the analysis thread was busy it would have to wait and so it could miss some incoming data. The same for the analysis thread. It will have to wait until data is available. If there is a sudden load on the network and there is a large delay in receiving data, the analysis thread will sit idle until suddenly a glut of data is received. See below.
What we want to do is to decouple these two threads so that if the producer is busy, there is still enough data available to occupy the consumer. If the consumer is busy, the producer has somewhere to stash the data quickly so it can get back to its task. This is done by placing a buffer between the two processes. An example interleaving showing the advantages of using a thread safe buffer to decouple to processes is shown below...
We need to make sure of two things. The consumer must not read from and empty buffer (underflow) and the producer must not write to a full buffer (overflow).
Now, in Python, the standard library already gives us such a thread
safe buffer! It is found in collections.deque
: Deques support
thread-safe, memory efficient appends and pops from either side of the deque
with approximately the same
.
O(1)
performance in either direction
Although, therefore, we would never implement this ourselves, it is a
nice exercise to try so that we can
learn about and practice using the threading
library.
This can be implemented using this two semaphores. See the following pseudo code:
put_data: spaces.acquire() # Decrements the semaphore. If there are no spaces, # will block until one becomes available write value into queue elements.release() # Increments the semaphore. May unblock any process # waiting for elements. get_data: elements.acquire() # Decrements the semaphore. If there are no elements # will block until one becomes available read value from queue spaces.release() # Increments the semaphore, May unblock any process # waiting for a space to become available.
An example implementation is shown below. See full example here.
import threading class ExampleThreadSafeBoundedBuffer(object): def __init__(self, size): self._size = size self._q = [None] * self._size self._start = 0 self._end = 0 self._elements = threading.Semaphore(0) self._spaces = threading.Semaphore(self._size) self._cr = threading.Lock() def enqueue(self, item): # Wait for a space to become available self._spaces.acquire() # Enter a critical region. We require this because if there are # multiple writers we could have multiple threads executing this logic # so we must enforce mutual exclusion. with self._cr: self._q[self._end] = item self._end = (self._end + 1) % self._size # Signal anyone waiting for an element to become available... self._elements.release() def dequeue(self): item = None # Wait for an element to be available in the buffer self._elements.acquire() # Enter a critical region. We require this because if there are # multiple readers we could have multiple threads executing this logic # so we must enforce mutual exclusion. with self._cr: item = self._q[self._start] self._start = (self._start + 1) % self._size # Signal anyone waiting for a space to become available... self._spaces.release() return item
But, in Python we can use condition variables. Here I replicate, with added comments, the example from the Python docs. We can see that only the one condition variable is required, rather than a pair of semaphores, which makes the implementation a little cleaner.
import threading # Somewhere create a condition variable cv = threading.Condition() # Consume one item in one thread cv.acquire() #< Lock is acquired while not an_item_is_available(): #< Therefore this executes in the CR cv.wait() #< Lock is released and thread sleeps # until cv.notify[All]() is called. # When cw.wait() unblocks it re-acquires the lock so at this point # we are back inside the CR get_an_available_item() cv.release() # Produce one item in another thread cv.acquire() make_an_item_available() cv.notify() cv.release()
The Dining Philosophers Problem: Sharing Resources
N philosophers sit around a table with N-1 forks available. To eat, each philosopher must pick up two forks, the one to their left and the one to their right. They pick up both forks to eat and once finished put down both forks and think for a while before eating again.
We must make sure that no philosopher starves! Dead philosophers are bad! We'd also like to make things fair... one fat philosopher who gobbles up 99% of the food will upset the others!
This problem demonstrates the concepts of deadlock and possible livelock (safety and fairness).
For example, deadlock would occur if every philosopher picked up their left fork before anyone picked up their right:
The Readers And Writers Problem: Conditional Forms Of Mutual Exclusion
The Handshaking Problem: Synchronous Message Passing
Readers/Writers Problem
Cigarette Smokers Problem
Sleeping Barbers Problem
Catching Unhandled Exceptions In Threads
Example (requires Python >= 3.8):
import threading import traceback _orig_excepthook = threading.excepthook def tf(): raise Exception("OOPS") def myhook(args): # Do something better than just print values out! print("OVERRIDEN") print(f"ARGS={args}") print(f"EXC_VALUE={args.exc_value}") print(f"THREAD={args.thread.name}") print("\n".join(traceback.extract_tb(args.exc_traceback).format())) _orig_excepthook(args) threading.excepthook = myhook threading.Thread(target=tf, name="jehtech").start()