pyppin.threading.semaphore¶
A smarter semaphore that can do things like wait for tasks to finish.
A semaphore is a shared reservior of (integer) capacity.
This is a more sophisticated version of threading.Semaphore and
threading.BoundedSemaphore: unlike
those classes, this class has a stop()
method, which (irreversibly) prevents further
resource acquisition and blocks until all resources have been released. This lets you do
things like wait until all tasks have completed! It also generalizes a threading.Barrier, since unlike a barrier, you
don’t have to know ahead of time how many tasks need to be waited for.
Basic Usage¶
Think of a semaphore as a reservoir of “resource,” with some total amount of resource
(optionally infinity) available. You acquire()
some resource at the start of an
operation, blocking until that much of the resource is available; when you’re done, you
release()
it. This guarantees that you never use more than the available capacity at
once, which makes this an effective throttling tool. You can also change the total available
capacity with set_capacity()
, and shut down the entire semaphore with stop()
.
Because releasing what you’ve acquired is so important, Semaphore provides two context manager API’s:
with semaphore:
... do something ...
with semaphore.get(amount=N, timeout=X, check=bool) as resource:
if resource:
... do something ...
else:
... you couldn't get the capacity! ...
The first syntax grabs one unit of resource, blocking indefinitely, and raises an exception (BrokenPipeError) if the semaphore somehow got shut down before you could get any. The second one is a more flexible syntax that lets you control all of these behaviors; with this syntax, you need to check whether the resource was successfully acquired.
This class is thread-safe.
Useful Ways to Use It¶
One common use is to limit concurrent use of a resource, such as an RPC server. In this
case, you can create a semaphore with some finite capacity, and have each use grab some
capacity; for example, the program might set throttle = Semaphore(5)
and then have
each worker thread call:
with throttle:
... do the expensive operation ...
Another common use is as a way to make sure all tasks have finished in a situation where you
don’t know how many tasks will happen ahead of time – for example, an RPC server might
want to enter a “lame-duck” mode where it stops accepting new requests and waits for pending
ones to finish before shutdown, or a batch job might issue a lot of asynchronous requests
and want to wait for them to finish (and yield their respective outputs) before ending
the job. A semaphore with no concurrency limit (capacity = None
) does the trick nicely:
simply have every task acquire()
when it starts and release()
when it finishes, and
when you want to end the job as a whole (stop allowing new requests and wait for existing
ones to finish), the thread that wants to wait for things to finish simply calls
Semaphore.stop()
.
Warning
If a code unit (e.g. a thread, or a worker object that can move between threads) acquires some units of resource, it must not acquire again until it has first released what it holds. Otherwise you will get a thread-starvation deadlock,¹ which is notoriously hard to catch via static analysis or unittests, and only manifests in production under high load, when your program suddenly comes to a halt. If a code flow might need more capacity later, acquire the maximum amount of capacity you might need up front.
¹ The exact mechanism: Say a worker holds one unit of capacity, and wants to grab one more unit of capacity, do something, then release both. Now imagine that many identical workers are active at once, so that the semaphore is at capacity. Every one of the workers could release some capacity, allowing things to continue, if it could just get one unit of capacity – but there is none, and nobody can release any! The program deadlocks. This is especially insidious because the most common reason someone might accidentally do this is if workers only sometimes need that second unit of capacity. In that case, normally things work fine: even if one worker needs an extra unit, some other worker will soon finish its task (without needing that) and release capacity. Things only go wrong once there are enough workers simultaneously in that special state, which tends to happen in unpredictable circumstances but generally at peak traffic, in the middle of the night, or during a highly-visible public event. Don’t let this happen to you. Release before you acquire.
Classes
|
- class pyppin.threading.semaphore.Semaphore(capacity: Optional[int] = None)[source]¶
Bases:
object
- class AcquireResult(value)[source]¶
Bases:
Enum
The result of an acquire() operation on a semaphore.
- acquire(amount: int = 1, timeout: Optional[float] = None) AcquireResult [source]¶
Acquire (take ownership of) some capacity within the semaphore.
If this function returns SUCCESS, the capacity was successfully acquired; the caller now owns amount of resource and must release it, by calling
release()
, when they are done using it. If the function returns any other value, the capacity was not acquired, and the caller owns nothing and should release nothing.It is guaranteed that if the timeout is zero or negative, this function will not block.
- Parameters
amount – The amount of capacity to acquire. Must be >= 0.
timeout – How long we should block, in seconds. None (the default) means to wait forever. Zero means that we should never block; if we can’t instantly get the capacity, return immediately.
- Returns
SUCCESS if the capacity was successfully acquired. TIMEOUT if the acquisition failed because time ran out. STOPPED if the acquisition failed because the semaphore has stopped. (This is a non-transient error!)
- acquire_checked(amount: int = 1, timeout: Optional[float] = None) None [source]¶
Acquire, raising an exception on failure.
- Parameters
amount – The amount of capacity to get.
timeout – How long we should block, in seconds. None (the default) means to wait forever. Zero means that we should never block; if we can’t instantly get the capacity, return immediately.
- Raises
TimeoutError – If check=True, timeout != None, and the request timed out. This is a retriable error.
BrokenPipeError – If check=True and the semaphore was stopped before the acquisition could complete. This is a non-retriable error.
- try_acquire(amount: int = 1) bool [source]¶
Try to acquire without blocking.
- Parameters
amount – The amount of capacity to acquire.
- Returns
True if the capacity was acquired, false otherwise.
- stop(timeout: Optional[float] = None) bool [source]¶
Shut down the semaphore.
All pending and future calls to get will fail immediately. This function will then block until either the timeout elapses, or all active resources have been released by their holders.
- Parameters
timeout – How long to wait for a full shutdown, or None to wait forever.
- Returns
True if the semaphore has been shut down, False for a timeout.
- set_capacity(capacity: Optional[int]) None [source]¶
Modify the capacity of the semaphore.
Note that the current usage of the semaphore may be transiently greater than its capacity, if you reduce the capacity with this mechanism!
- class Resource(sem: Semaphore, amount: int = 1, timeout: Optional[float] = None, check: bool = False)[source]¶
Bases:
AbstractContextManager
- get(amount: int = 1, timeout: Optional[float] = None, check: bool = False) Resource [source]¶
Context manager API to acquire capacity from the semaphore.
This function works just like
acquire()
oracquire_checked()
, but (if it doesn’t raise) returns a context manager which releases the resources on exit.If you call this with check=False, note that you must check whether the acquire succeeded (with
resource.status
) before using it!- Parameters
amount – The amount of capacity to acquire, which must be >= 0.
timeout – How long we should block, in seconds, or None (the default) to wait forever. If the argument is zero, it is guaranteed that this function will not block.
check – If True, this function will raise an exception on failure. If False, returns a resource with a nonzero status.
- Returns
- A context manager whose bool value indicates whether the resource was successfully
acquired or not. If check is True, this value is guaranteed to always be True.
- Raises
TimeoutError – If check=True, timeout != None, and the request timed out. This is a retriable error.
BrokenPipeError – If check=True and the semaphore was stopped before the acquisition could complete. This is a non-retriable error.