Implementation of the BlockingQueue Interface That Works Properly When Accessed via Multiple Threads

A BlockingQueue is typically used to have on thread produce objects, which another thread consumes. Here is a diagram that illustrates this principle:

blocking-queue
A BlockingQueue with one thread putting into it, and another thread taking from it.

The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. It’s limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.

The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.

BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

The code below defines an implementation of the BlockingQueue interface that works properly when accessed via multiple threads since it’s synchronized properly

class SimpleBlockingQueue<E> implements BlockingQueue<E> {
    /**
     * The queue consists of a List of E's.
     */
    final private List<E> mList;

Add a new E to the end of the queue, blocking if necessary for space to become available

 public void put(E e) throws InterruptedException {
        synchronized(this) {
            if (e == null)
                throw new NullPointerException();

            // Wait until the queue is not full.
            while (isFull()) {
                // System.out.println("BLOCKING ON PUT()");
                wait();
            }

            // Add e to the ArrayList.
            mList.add(e);
            
            // Notify that the queue may have changed state, e.g., "no
            // longer empty".
            notifyAll();
        }
    }

Remove the E at the front of the queue, blocking until there’s something in the queue

public E take() throws InterruptedException {
        synchronized(this) {
            // Wait until the queue is not empty.
            while (mList.isEmpty()) {
                // System.out.println("BLOCKING ON TAKE()");
                wait();
            }

            final E e = mList.remove(0);
        
            // Notify that the queue may have changed state, e.g., "no
            // longer full".
            notifyAll();
            return e;
        }
    }

 

Using Java Thread.join() as a simple barrier synchronizer [Android]

Starts a Thread for each element in the List of input Strings and uses Thread.join() to wait for all the Threads to finish. This implementation doesn’t require any Java synchronization mechanisms other than what’s provided by Thread

 private volatile List<String> mInput = null;

/**
* The array of words to find.
*/
final String[] mWordsToFind;

/**
* The List of worker Threads that were created.
*/
private List mWorkerThreads;

Launch process for each and every string

// This List holds Threads so they can be joined when their processing is done.
mWorkerThreads = new LinkedList();

// Create and start a Thread for each element in the
// mInput.
for (int i = 0; i < mInput.size(); ++i) {
    // Each Thread performs the processing designated by
    // the processInput() method of the worker Runnable.
    Thread t = new Thread(makeTask(i));

    // Add to the List of Threads to join.
    mWorkerThreads.add(t);

    // Start the Thread to process its input in the background.
    t.start();
}

Barrier synchronization: using thread.join() to wait for the completion of all the other threads

// Barrier synchronization.
for (Thread thread : mWorkerThreads)
   try 
   {
      thread.join();
   } 
      catch (InterruptedException e) 
   {
      printDebugging("join() interrupted");
   }

If t is a Thread object whose thread is currently executing,

t.join();

causes the current thread to pause execution until t‘s thread terminates.

Overloads of join allow the programmer to specify a waiting period.

Like sleep, join responds to an interrupt by exiting with an InterruptedException.

Based on https://github.com/douglascraigschmidt/LiveLessons/blob/master/ThreadJoinTest/src/ThreadJoinTest.java