Sunday 26 July 2015

Blocking Queue Implementation in java

Blocking Queue is specific form of Queue. As Name say "Blocking", means it blocks  Some operation.
As we all know, Queue works on "FIFO" Principle. Where FIFO stands for "First in First Out".
You write the object at one end and retrieves the objects at other end.

What happens, When you try to get the Element from empty queue. It will return null.
Suppose we have a requirement where we have to insert fixed number of Element not more than that. In this case Once Queue is full, will not allow us to insert another element.

BlockingQueue  works on same principle, Only fixed number of Element can be inserted. If we try to take the element from empty Queue then operation would be blocked until there is size of the queue becomes greater than 0 i.e it contains at least one Element.
When you user try to insert an Element if the size is full then blocks the enqueue Operation until at size becomes lesser than fixed number.

Producer and consumer problem can easily be achieved using Blocking Queue.






Code Snippet:How to write our own Blocking Queue.


import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

public class MyBlockingQueue<T> {

    private final LinkedList<T> list;
    AtomicInteger count=null;
    final int upperbound;
   
    public MyBlockingQueue(int upperBound) {
        count=new AtomicInteger(0);
        list=new LinkedList<T>();
        this.upperbound=upperBound;
    }
    public synchronized void offer(T t){
        while(count.get()==upperbound){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        notifyAll();
        list.addFirst(t);
        count.incrementAndGet();
       
    }
    public synchronized T take(){
        while(count.get()==0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
         notifyAll();
        T t=list.pollLast();
        count.decrementAndGet();
        return t;
    }
   
}

 






Test Class

public class MyBlockingQueueTest {
    public static void main(String[] args) {
        MyBlockingQueue<String> myBlockingQueue=new MyBlockingQueue<String>(7);
        Producer<String> producer=new Producer<String>(myBlockingQueue);
        Consumer<String> consumer=new Consumer<String>(myBlockingQueue);
        consumer.start();
        producer.start();
      
    }
   
    static class Producer<T> extends Thread{
        MyBlockingQueue<T> queue;
        public Producer(MyBlockingQueue<T> queue) {
            this.queue=queue;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                queue.offer((T)Integer.toString(i));
            }
        }
    }
    static class Consumer<T> extends Thread{
        MyBlockingQueue<T> queue;
        public Consumer(MyBlockingQueue<T> queue) {
            this.queue=queue;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                System.out.println(queue.take());
            }
        }
    }
}



Note: Blocking Queue can also be implemented using Semaphore

No comments:

Post a Comment