Producer Consumer Problem using Blocking Queue in Java
Upasana | August 31, 2019 | 3 min read | 2,024 views | Multithreading and Concurrency
Producer Consumer Problem
Producer-consumer problem is a classic example of multi-threading synchronization problem. It is a must to know problem if you want to delve into Java concurrency & mutli-threading concepts.
Problem Description
The problem describes two entities, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time and processing it. The problem is to make sure that the producer won’t try to add data into the buffer if it’s full and that the consumer won’t try to remove data from an empty buffer, and at the same time ensure the thread-safety.
Steps to Solve problem
-
The solution for the producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again.
-
In the same way, the consumer can go to sleep if it finds the buffer to be empty. The next time the producer puts data into the buffer, it wakes up the sleeping consumer.
-
while doing all of this, ensure the thread-safety.
Solving Producer Consumer Problem in Core Java
There are multiple ways to solve this problem -
-
Semaphores can be used to coordinate b/w producer and consumers.
-
Using synchronization to solve the problem.
-
Using non-blocking algorithms to solve this problem.
-
Using BlockingQueue to solve the problem.
In this article we will focus only on blocking key approach.
Interviewers are mostly interested in solving producer-consumer problem from scratch to evaluate your multi-threading skills, so we will implement a simple version of blocking queue from scratch.
-
A blocking queue
-
Producer thread(s)
-
Consumer thread(s)
We can implement our own simple thread-safe version of BlockingQueue using synchronization, as shown in below code:
package com.shunya;
class BlockingQueue {
final Object[] items = new Object[100];
int putptr, takeptr, count;
private boolean closed = false;
public synchronized void put(Object x) throws InterruptedException {
while (count == items.length)
wait();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notifyAll();
}
public synchronized Object take() throws InterruptedException {
while (count == 0)
wait();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notifyAll();
return x;
}
public synchronized boolean isClosed() {
return closed;
}
public synchronized void setClosed(boolean closed) {
this.closed = closed;
}
}
A producer is nothing but a thread puts task into BlockingQueue till the queue is full.
import java.util.concurrent.ThreadLocalRandom;
public class Producer implements Runnable {
private final BlockingQueue<SquareTask> queue;
Producer(BlockingQueue<SquareTask> q) {
queue = q;
}
public void run() {
try {
while (!queue.isClosed()) {
queue.put(produce());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
private SquareTask produce() {
return new SquareTask(ThreadLocalRandom.current().nextInt(1, 200));
}
}
A consumer listens on BlockingQueue and keeps consuming the tasks waiting if queue is empty.
public class Consumer implements Runnable {
private final BlockingQueue<SquareTask> queue;
Consumer(BlockingQueue<SquareTask> q) {
queue = q;
}
public void run() {
try {
while (!queue.isClosed()) {
consume(queue.take());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
private void consume(SquareTask x) {
System.out.println(x.execute());
}
}
public class ProducerConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<SquareTask> boundedBuffer = new BlockingQueue<>();
Producer p = new Producer(boundedBuffer);
Consumer c1 = new Consumer(boundedBuffer);
Consumer c2 = new Consumer(boundedBuffer);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
Thread.sleep(100);
boundedBuffer.setClosed(true);
}
}
Why producer-consumer problem is important?
-
It can be used to distribute the work among various workers that can be scaled up or down as per the load requirements.
-
It can be used to abstract Producer and Consumer connected through a shared pipeline. Producer does not need to know about the Consumer, thus there is abstraction of producers and consumers of work items i.e. separation of concerns. This leads to a better OOP Design.
-
Producer and Consumer does not need to be available at the same time. Consumer can pick up tasks produced by producer at a different time.
Top articles in this category:
- Blocking Queue implementation in Java
- Diamond Problem of Inheritance in Java 8
- What is Double Checked Locking Problem in Multi-Threading?
- Custom Thread pool implementation in Java
- Troubleshooting Deadlock in Java
- what are Key classes in java.util.concurrent package
- Count word frequency in Java