Implementation of the BlockingQueue Interface That Works Properly When Accessed via Multiple Threads

A BlockingQueue is typically used to have on thread produce objects, which another thread consumes. Here is a diagram that illustrates this principle:

blocking-queue
A BlockingQueue with one thread putting into it, and another thread taking from it.

The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. It’s limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.

The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.

BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

The code below defines an implementation of the BlockingQueue interface that works properly when accessed via multiple threads since it’s synchronized properly

class SimpleBlockingQueue<E> implements BlockingQueue<E> {
    /**
     * The queue consists of a List of E's.
     */
    final private List<E> mList;

Add a new E to the end of the queue, blocking if necessary for space to become available

 public void put(E e) throws InterruptedException {
        synchronized(this) {
            if (e == null)
                throw new NullPointerException();

            // Wait until the queue is not full.
            while (isFull()) {
                // System.out.println("BLOCKING ON PUT()");
                wait();
            }

            // Add e to the ArrayList.
            mList.add(e);
            
            // Notify that the queue may have changed state, e.g., "no
            // longer empty".
            notifyAll();
        }
    }

Remove the E at the front of the queue, blocking until there’s something in the queue

public E take() throws InterruptedException {
        synchronized(this) {
            // Wait until the queue is not empty.
            while (mList.isEmpty()) {
                // System.out.println("BLOCKING ON TAKE()");
                wait();
            }

            final E e = mList.remove(0);
        
            // Notify that the queue may have changed state, e.g., "no
            // longer full".
            notifyAll();
            return e;
        }
    }

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s