Extending Java’s Semaphore to Allow Dynamic Resizing

Genius SemaphoreA semaphore is a very useful utility when constructing concurrent systems. If you’re not familiar with their usage, the basic idea is semaphores allow a certain number of “permits” to be released, and anyone who wants a permit when there are none left will be blocked until someone else returns one.

Acquiring and releasing permits with Semaphore

Java’s concurrency library contains an implementation named, unsurprisingly, Semaphore. A Semaphore is initialized with the number of permits that it should allow. As an example, if I create an instance with new Semaphore(2), then the first two acquire() calls will return instantly, but the third acquire() will block until another thread calls release().

The typical usage is that threads will call acquire() before they interact with some shared resource and call release() afterwards. This means that using a semaphore for concurrency control relies on programmers knowing when to release and acquire locks, as opposed to simply using a synchronized method or block (in which case the Java runtime will take care of locking for you).

If you’re calling code that can throw exceptions after acquiring a permit, remember to put the call to release() in a finally block so that the permit is released even if an exception is thrown. If you don’t ensure that release() is called every time you acquire() even in the case of exceptions, the semaphore will eventually run out of permits and your app will deadlock. If you call release() more than once in some circumstances, you’ll have a different (and similarly difficult to debug) problem: your semaphore will be allowing more concurrent access than you intended.

// Bad -- semaphore not released if the exception is thrown
try {
    sem.acquire();
    sharedResource.doStuff();
    sem.release();
} catch (IOException e) {
    logger.warn("The resource is broken", e);
}
// Good
try {
    sem.acquire();
    sharedResource.doStuff();
} catch (IOException e) {
    logger.warn("The resource is broken", e);
} finally {
    sem.release();
}

This is all fine and dandy, and as long as this is the behavior you want, you’re good to go! (Unless you maybe want a latch or barrier, but that’s another story.)

Dynamically Increase Permits

However, what if you want to be able to reconfigure the number of permits a Semaphore has after it’s been initialized? This is often applicable to long-running programs that may need to have their parameters adjusted without restarting. Well, one half of the problem is easy: note the method signatures on acquire() and release(). Calling acquire() doesn’t return anything, so release() doesn’t take a parameter. This means that if we want to increase the number of permits available, we can simply call release() as many times as we want to increase the number of permits, since we aren’t actually releasing anything tangible.

Semaphore sem = new Semaphore(3);
sem.release();
sem.release();
 
for (int i = 0; i < 5; i++) {
    sem.acquire();
    System.out.println("Acquired a permit");
}

So, increasing permits is easy. Reducing them is a little more tricky, though. Let’s say you have a Semaphore instance configured to allow 10 permits (whether by being created with 10 permits or by calling release() — now we know they’re equivalent!), but now you want to allow fewer threads to access the shared resource.

Dynamically Decrease Permits

To be specific, if 10 threads currently have permits and 7 is the new limit, we won’t try to communicate to any threads that they are now in violation of the new limit. What we can do is make sure that the next call to acquire() will block until enough threads (4, to be precise) have called release() to bring the number of outstanding permits to below the new limit. It’s not infeasible to interrupt threads that are holding permits beyond the new upper limit, but that’s beyond the scope of a simple semaphore.

If you wanted to reconfigure it to only allow 7 permits, you have several options:

  • You call acquire() three times. This might work, or it might block forever — if more than 7 permits are currently in use, one (or more) of your acquire() calls could block for an arbitrarily long amount of time. Checking availablePermits() isn’t going to help, either, since using that to gauge whether or not an acquire() call will block is a classic check-then-act race condition. Even if the race condition could be avoided, it doesn’t help: instead of blocking, you’ll just keep getting 0 back from availablePermits().
  • You could call drainPermits() repeatedly until at least 3 permits had been drained (and then release() any extra that were drained beyond the 3 you need). This could take an arbitrarily long time.
  • You could call tryAcquire() repeatedly until you’ve gotten 3 permits. This also could take an arbitrarily long time.

These approaches all share two flaws:

  1. Most importantly, they block until as many permits as you are trying to remove have become available
  2. They don’t really achieve the goal directly

To be precise, the goal is that the semaphore should release fewer permits to the threads that are using the semaphore to control access to the shared resource. Acquiring (and presumably not releasing) permits is one way to achieve the goal, but it is not necessarily the only way. Put another way, the thread that is executing the reconfiguration does not technically need to acquire permits (whether it be by acquire() or drainPermits() or tryAcquire(), etc) to fulfill this requirement, since it doesn’t need to access the shared resource that the semaphore is guarding.

Extending Semaphore to use reducePermits()

Fortunately, there is a method that does exactly what we want: reducePermits(). (For a look at the origin of this method, see here.) However, it’s protected, so you’ll need to create a subclass of Semaphore to expose it. Once we have such a class, we can adjust the number of permits up (by calling release()) and down (by calling reducePermits()) without blocking. This is great, and if all you want is to be able to increase or decrease the number of permits at will, then you’ve got what you need. Sometimes, though, it would be easier to be able to specify the new number of permits regardless of what it was before. We can do this by having a wrapper that keeps track of the last requested maximum number of permits, compares that with the new maximum, and either adds or removes permits as needed.

A simple implementation that extends Semaphore to expose reducePermits() is provided below. It includes a wrapper (AdjustableSemaphore) that only exposes the two most commonly used methods (release() and acquire()). It includes a @ThreadSafe annotation defined in a jar available at jcip.net. (JCIP is Java Concurrency In Practice, a thoroughly excellent book.) If you aren’t using the JCIP annotations in your project and don’t wish to add another jar, feel free to remove the @ThreadSafe and corresponding import. I encourage you to use the annotations, though. They are strictly informational (they do not modify any runtime or compile-time behavior), but are very useful when looking at a class to quickly see what thread safety guarantees the author intends to provide. This is especially handy when modifying someone else’s code: you certainly wouldn’t want to unknowingly make a thread-safe class not thread-safe! Also, FindBugs is smart enough to inform you if you make a class, say, @Immutable but have non-final fields in it.

package com.genius.concurrency;
 
import java.util.concurrent.Semaphore;
import net.jcip.annotations.ThreadSafe;
 
/**
 * A simple implementation of an adjustable semaphore.
 */
@ThreadSafe
final public class AdjustableSemaphore {
 
    /**
     * semaphore starts at 0 capacity; must be set by setMaxPermits before use
     */
    private final ResizeableSemaphore semaphore = new ResizeableSemaphore();
 
    /**
     * how many permits are allowed as governed by this semaphore.
     * Access must be synchronized on this object.
     */
    private int maxPermits = 0;
 
    /**
     * New instances should be configured with setMaxPermits().
     */
    public AdjustableSemaphore() {
        // no op
    }
 
    /*
     * Must be synchronized because the underlying int is not thread safe
     */
    /**
     * Set the max number of permits. Must be greater than zero.
     *
     * Note that if there are more than the new max number of permits currently
     * outstanding, any currently blocking threads or any new threads that start
     * to block after the call will wait until enough permits have been released to
     * have the number of outstanding permits fall below the new maximum. In
     * other words, it does what you probably think it should.
     *
     * @param newMax
     */
    synchronized void setMaxPermits(int newMax) {
        if (newMax < 1) {
            throw new IllegalArgumentException("Semaphore size must be at least 1,"
                + " was " + newMax);
        }
 
        int delta = newMax - this.maxPermits;
 
        if (delta == 0) {
            return;
        } else if (delta > 0) {
            // new max is higher, so release that many permits
            this.semaphore.release(delta);
        } else {
            delta *= -1;
            // delta < 0.
            // reducePermits needs a positive #, though.
            this.semaphore.reducePermits(delta);
        }
 
        this.maxPermits = newMax;
    }
 
    /**
     * Release a permit back to the semaphore. Make sure not to double-release.
     *
     */
    void release() {
        this.semaphore.release();
    }
 
    /**
     * Get a permit, blocking if necessary.
     *
     * @throws InterruptedException
     *             if interrupted while waiting for a permit
     */
    void acquire() throws InterruptedException {
        this.semaphore.acquire();
    }
 
    /**
     * A trivial subclass of <code>Semaphore</code> that exposes the reducePermits
     * call to the parent class. Doug Lea says it's ok...
     * http://osdir.com/ml/java.jsr.166-concurrency/2003-10/msg00042.html
     */
    private static final class ResizeableSemaphore extends Semaphore {
        /**
         *
         */
        private static final long serialVersionUID = 1L;
 
        /**
         * Create a new semaphore with 0 permits.
         */
        ResizeableSemaphore() {
            super(0);
        }
 
        @Override
        protected void reducePermits(int reduction) {
            super.reducePermits(reduction);
        }
    }
}

This is just a basic implementation. Based on your requirements, you may wish to further constrain the number of permits to be within a certain window, or to provide a constructor that accepts a number of permits to pre-configure with, or to allow a negative number of permits, or to expose more of Semaphore’s methods, or any number of other modifications.

  • Digg
  • StumbleUpon
  • del.icio.us
  • Facebook
  • Twitter
  • Google Bookmarks
  • DZone
  • HackerNews
  • LinkedIn
  • Reddit
  • Aiox

    I don’t understand why there is NO incrementPermits() mthod.
    As there is use for reducePermits there is also need for incrementing permits as/if more resources are available.

  • Marshall Pierce

    Aiox: you can call public void release(int permits) (part of the standard Semaphore interface) to increase the permits in the semaphore.
    Code that’s actually constraining access to a resource would typically use public void release() (note the lack of method arguments) since such tasks are typically done one permit at a time. For administrative tasks, though, you would probably want to increase the number of permits by some user-provided value, so it’s tidier to use the version that takes an int parameter to increase the number of permits as much as you want.

  • Tigran Mkrtchyan

    Is it legal to use this code in our application? Are we allowed to modify it? Thanks.

    • Marshall Pierce

      Feel free to use it as you see fit. It’s in the public domain.

  • Magic

    class DynamicPermitsSemaphore extends Semaphore {

    int numberOfPermits = 10;

    public DynamicPermitsSemaphore() {
    super(10, true);
    }

    synchronized void setNumberOfPermits(int newNumberOfPermits) {
    if (newNumberOfPermits > numberOfPermits) {
    release(newNumberOfPermits – numberOfPermits);
    } else if (newNumberOfPermits < numberOfPermits) {
    reducePermits(numberOfPermits – newNumberOfPermits);
    }
    numberOfPermits = newNumberOfPermits;
    }
    };