Lesson 21 of 27
In Progress

Design Bounded Blocking Queue

October 14, 2020

Implement a thread safe bounded blocking queue.

Introduction

Java provides java.util.concurrent construct to solve the concurrent producer-consumer problem. 

BlockingQueue Types

We can distinguish two types of BlockingQueue

  • unbounded queue – can grow almost indefinitely
  • bounded queue – with maximal capacity defined

1. Unbounded Queue

Creating unbounded queues is simple:

BlockingQueue blockingQueue = new LinkedBlockingDeque<>();

The Capacity of blockingQueue will be set to Integer.MAX_VALUE. All operations that add an element to the unbounded queue will never block, thus it could grow to a very large size.

The most important thing when designing a producer-consumer program using unbounded BlockingQueue is that consumers should be able to consume messages as quickly as producers are adding messages to the queue. Otherwise, the memory could fill up and we would get an OutOfMemory exception.

2. Bounded Queue

The second type of queues is the bounded queue. We can create such queues by passing the capacity as an argument to a constructor:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

Using bounded queue is a good way to design concurrent programs because when we insert an element to an already full queue, that operations need to wait until consumers catch up and make some space available in the queue. It gives us throttling without any effort on our part.

Blocking Queue API

  • put() – inserts the specified element into a queue, waiting for a free slot if necessary
  • offer() – returns true if insertion was successful, otherwise false

There are more APIs

  • add() – returns true if insertion was successful, otherwise throws an IllegalStateException
  • offer(E e, long timeout, TimeUnit unit) – tries to insert element into a queue and waits for an available slot within a specified timeout
  • take() – waits for a head element of a queue and removes it. If the queue is empty, it blocks and waits for an element to become available
  • poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

Java Default Implementation

Output:

Custom Implementation

import java.util.Deque;
import java.util.LinkedList;
public class JavaBoundedBlockingQueue {
Deque<Integer> deQueue;
int size;
Object lock;
public JavaBoundedBlockingQueue(int capacity) {
deQueue = new LinkedList<>();
size = capacity;
lock = new Object();
}
public static void main(String[] args) throws InterruptedException {
JavaBoundedBlockingQueue boundedBlockingQueue = new JavaBoundedBlockingQueue(3);
boundedBlockingQueue.enqueue(1);
boundedBlockingQueue.dequeue();
boundedBlockingQueue.dequeue();
boundedBlockingQueue.enqueue(0);
boundedBlockingQueue.enqueue(2);
boundedBlockingQueue.enqueue(3);
boundedBlockingQueue.enqueue(4);
}
// When queue is full,
// block enqueue thread,
// add thread to full waiting list
private void enqueue(int element) throws InterruptedException {
synchronized (lock) {
while (deQueue.size() == size) {
lock.wait();
}
deQueue.addLast(element);
lock.notify();
}
}
// When queue is empty, we block dequeue thread,
// and add thread to empty waiting list
private int dequeue() throws InterruptedException {
int val = 0;
synchronized (lock) {
while (deQueue.isEmpty()) {
lock.wait();
}
val = deQueue.removeFirst();
lock.notify();
}
return val;
}
public int size() {
return deQueue.size();
}
}
Output:

Source: LeetCode – 1188, Baeldung.com