class SimpleBlockingQueue {
final Object[] items = new Object[100]; (1)
int putptr, takeptr, count;
public synchronized void put(Object x) throws InterruptedException {
while (count == items.length)
wait(); (2)
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notifyAll(); (3)
}
public synchronized Object take() throws InterruptedException {
while (count == 0)
wait(); (4)
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notifyAll(); (5)
return x;
}
}
Blocking Queue implementation in Java
Upasana | July 24, 2020 | 3 min read | 4,755 views
A blocking queue allows multiple threads to communicate with each other and pass data around. For example, a producer can put items to the queue while consumer can take items out from the queue.
BlockingQueue implementations are designed to be used primarily for producer-consumer queues. |
A blocking queue has below characteristics:
-
It is always thread-safe
-
It can hold arbitrary data
-
Producer has to wait if the queue is already full, similarly consumer has to be wait if no item is present in the queue.
Java implementation
A trivial implementation of BlockingQueue using intrinsic locking using synchronized keyword will look like the following:
1 | Max capacity of blocking queue is 100 |
2 | We are waiting inside a while loop while queue capacity is full. While loop is required to avoid spurious wakeups. |
3 | All other waiting threads are notified as soon as a new item is added to the queue. |
4 | Consumer thread waits inside a while loop for arrival of new item, if queue is empty. while loop prevents spurious wakeup problem. |
5 | Consumer thread notifies all waiting producer threads as soon as an item is removed from the queue. |
A realistic implementation would have much more methods (peek(), remove(), offer(), isFull(), etc.) to the queue, but for brevity we have implemented only two methods.
A slightly improved version
Improved version of Queue will use explicit locking, to improve the multi-threading performance. Lock and Condition interface provides much better flexibility compared to intrinsic locking mechanism, but this flexibility brings more responsibility as we have to take care of calling lock and unlock ourselves. Since one lock can be associated with multiple conditions (notFull & notEmpty in this case), this results in better throughput due to lesser thread contention.
Lock is analogical to synchronized keyword and Condition is similar to wait/notify. |
class SimpleBlockingQueue {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
Here we are using ReentrantLock along with Condition to explicitly define the lock. This reduces thread contention under heavy load circumstances.
We can do few further improvements to the above version, for example we can use LinkedList instead of array and optionally make it generic to support any item type.
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public void put(T element) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await();
}
queue.add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
T item = queue.remove();
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
Please be noted that all above implementations are for learning purpose only, not for production use. For production, you shall prefer to use LinkedBlockingQueue or any other equivalent implementation from java.util.concurrent
package.
That’s all.
Top articles in this category:
- Producer Consumer Problem using Blocking Queue in Java
- Custom Thread pool implementation in Java
- what are Key classes in java.util.concurrent package
- How will you implement your custom threadsafe Semaphore in Java
- Count word frequency in Java
- Submit Form with Java 11 HttpClient - Kotlin
- Troubleshooting Deadlock in Java