Wednesday 12 January 2011

Listen to the Words I'm Saying With My Mouth

Multithreaded code is at the crux of any server developers toolbox and alot of times the producer/consumer pattern is used. I've seen a lot of implementations of this and typically the developer will use a shared queue for communication.

To draw an example, if we were to write a process that will read lines from a file, reverse each line and write it to another file, we might have something like this:-

  
public class QueueBasedReverser {
private static final int QUEUE_SIZE = 100;

public static void main(String[] args) throws IOException {
FileReader reader = new FileReader("inputfile");
FileWriter writer = new FileWriter("outputfile");

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(QUEUE_SIZE);

LineProducer producer = new LineProducer(queue, reader);
LineConsumer consumer = new LineConsumer(queue, writer);

new Thread(producer).start();
new Thread(consumer).start();
}
}

public class LineProducer implements Runnable {
public static final String POISON_PILL = "_THIS_IS_THE_POISON_PILL_MESSAGE";
private final BlockingQueue<String> queue;
private final BufferedReader reader;

public LineProducer(BlockingQueue<String> queue, FileReader reader) {
this.queue = queue;
this.reader = new BufferedReader(reader);
}

@Override
public void run() {
String line;
try {
while ((line = reader.readLine()) != null) {
queue.put(line);
}
queue.put(POISON_PILL);
reader.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public class LineConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final BufferedWriter writer;

public LineConsumer(BlockingQueue<String> queue, Writer writer) {
this.queue = queue;
this.writer = new BufferedWriter(writer);
}

@Override
public void run() {
String line = null;
try {
while ((line = queue.poll(1, TimeUnit.MINUTES)) != LineProducer.POISON_PILL) {
writer.write(reversed(line));
writer.newLine();
}
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private String reversed(String line) {
return new StringBuffer(line).reverse().toString();
}
}

This is a fairly typical implementation making use of a shared bounded queue for communication and a poison pill to signify the end of processing. This is problematic as the queue and the poison pill are both leaking implementation details about both the producer and consumer and has tied them both together quite tightly.

Those who have some experience with UI programming should be well versed in the notion of adding listeners to be notified of when events happen, and this is a pattern we can apply on the server as well as the client. The idea here is that the producer raises an "event" when it wants to tell the world something, in this case that it has read a line from the file. The consumer can register itself as a listener for this event and respond by writing the reversed string to the output file.

 
public class ListenerBasedReverser {
public static void main(String[] args) throws IOException {
FileReader reader = new FileReader("inputfile");
FileWriter writer = new FileWriter("outputfile");

LineProducer producer = new LineProducer(reader);
LineConsumer consumer = new LineConsumer(writer);
producer.addListener(consumer);
producer.readFile();
}
}

public interface LineReadListener {
void lineRead(String line);
void allLinesRead();
}

public class LineProducer {
private final BufferedReader reader;
private ArrayList<LineReadListener> listeners = new ArrayList<LineReadListener>();

public LineProducer(FileReader reader) {
this.reader = new BufferedReader(reader);
}

public void readFile() {
String line;
try {
while ((line = reader.readLine()) != null) {
for (LineReadListener listener : listeners) {
listener.lineRead(line);
}
}
reader.close();
for (LineReadListener listener : listeners) {
listener.allLinesRead();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void addListener(LineReadListener consumer) {
listeners.add(consumer);
}
}

public class LineConsumer implements Runnable, LineReadListener {
private static final String POISON_PILL = "_THIS_IS_THE_INTERNAL_POISON_PILL_";
private final BlockingQueue<String> queue;
private final BufferedWriter writer;

public LineConsumer(Writer writer) {
this.queue = new ArrayBlockingQueue<String>(100);
this.writer = new BufferedWriter(writer);
new Thread(this).start();
}

@Override
public void run() {
String line = null;
try {
while ((line = queue.poll(1, TimeUnit.MINUTES)) != POISON_PILL) {
writer.write(reversed(line));
writer.newLine();
}
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private String reversed(String line) {
return new StringBuffer(line).reverse().toString();
}

@Override
public void lineRead(String line) {
try {
queue.put(line);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void allLinesRead() {
try {
queue.put(POISON_PILL);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

In this implementation we're still using a bounded queue and a poison pill, but this is now completely internal to the consumer and we have the option to change the implementation without touching any other part of the system. The producer and consumer have no shared information or implementation.

The keen eyed among you may have noticed that the actions of the producer and consumer are now also decoupled, meaning the producer has no knowledge that input is being read from a file, it can come from anywhere! The LineProducer also has the ability to be used to read any file, with different listeners attached to do processing.

Another not so obvious benefit of this approach is we can add multiple listeners. If we want to track how many lines per second we were processing, we can just implement a new LineReadListener to handle this logic and add it as a second listener without affecting the line reversing. This gives us a really nice separation of concerns and lets us adhere to the Single Responsibility Principle alot easier.

1 comment:

  1. Hi,
    I came across your post while researching on ArrayBlockingQueue. Essentially, I need a way to continuously log all the incoming requests and outgoing responses of a servlet. I dont want to use any of the tomcat's classes as I want to deploy the app on any j2ee server. Right now, I have a filter class that has access to request and response and I place this info in a arrayblockingqueue. I also have a contextlistener class that starts a thread and pass the same arrayblockingqueue as part of constructor. Now in the run method, I use queue.take() but if I do that in a while(true) loop then is that bad for performance? I really dont want to loose any incoming requests and also make sure performance is not affected. Really appreciate any help.

    ReplyDelete