Multithreaded code is at the crux of any server developers toolbox and alot of times the producer/consumer pattern is used. I've seen a lot of implementations of this and typically the developer will use a shared queue for communication.
To draw an example, if we were to write a process that will read lines from a file, reverse each line and write it to another file, we might have something like this:-
public class QueueBasedReverser {
private static final int QUEUE_SIZE = 100;
public static void main(String[] args) throws IOException {
FileReader reader = new FileReader("inputfile");
FileWriter writer = new FileWriter("outputfile");
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(QUEUE_SIZE);
LineProducer producer = new LineProducer(queue, reader);
LineConsumer consumer = new LineConsumer(queue, writer);
new Thread(producer).start();
new Thread(consumer).start();
}
}
public class LineProducer implements Runnable {
public static final String POISON_PILL = "_THIS_IS_THE_POISON_PILL_MESSAGE";
private final BlockingQueue<String> queue;
private final BufferedReader reader;
public LineProducer(BlockingQueue<String> queue, FileReader reader) {
this.queue = queue;
this.reader = new BufferedReader(reader);
}
@Override
public void run() {
String line;
try {
while ((line = reader.readLine()) != null) {
queue.put(line);
}
queue.put(POISON_PILL);
reader.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public class LineConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final BufferedWriter writer;
public LineConsumer(BlockingQueue<String> queue, Writer writer) {
this.queue = queue;
this.writer = new BufferedWriter(writer);
}
@Override
public void run() {
String line = null;
try {
while ((line = queue.poll(1, TimeUnit.MINUTES)) != LineProducer.POISON_PILL) {
writer.write(reversed(line));
writer.newLine();
}
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String reversed(String line) {
return new StringBuffer(line).reverse().toString();
}
}
This is a fairly typical implementation making use of a shared bounded queue for communication and a poison pill to signify the end of processing. This is problematic as the queue and the poison pill are both leaking implementation details about both the producer and consumer and has tied them both together quite tightly.
Those who have some experience with UI programming should be well versed in the notion of adding listeners to be notified of when events happen, and this is a pattern we can apply on the server as well as the client. The idea here is that the producer raises an "event" when it wants to tell the world something, in this case that it has read a line from the file. The consumer can register itself as a listener for this event and respond by writing the reversed string to the output file.
public class ListenerBasedReverser {
public static void main(String[] args) throws IOException {
FileReader reader = new FileReader("inputfile");
FileWriter writer = new FileWriter("outputfile");
LineProducer producer = new LineProducer(reader);
LineConsumer consumer = new LineConsumer(writer);
producer.addListener(consumer);
producer.readFile();
}
}
public interface LineReadListener {
void lineRead(String line);
void allLinesRead();
}
public class LineProducer {
private final BufferedReader reader;
private ArrayList<LineReadListener> listeners = new ArrayList<LineReadListener>();
public LineProducer(FileReader reader) {
this.reader = new BufferedReader(reader);
}
public void readFile() {
String line;
try {
while ((line = reader.readLine()) != null) {
for (LineReadListener listener : listeners) {
listener.lineRead(line);
}
}
reader.close();
for (LineReadListener listener : listeners) {
listener.allLinesRead();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void addListener(LineReadListener consumer) {
listeners.add(consumer);
}
}
public class LineConsumer implements Runnable, LineReadListener {
private static final String POISON_PILL = "_THIS_IS_THE_INTERNAL_POISON_PILL_";
private final BlockingQueue<String> queue;
private final BufferedWriter writer;
public LineConsumer(Writer writer) {
this.queue = new ArrayBlockingQueue<String>(100);
this.writer = new BufferedWriter(writer);
new Thread(this).start();
}
@Override
public void run() {
String line = null;
try {
while ((line = queue.poll(1, TimeUnit.MINUTES)) != POISON_PILL) {
writer.write(reversed(line));
writer.newLine();
}
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String reversed(String line) {
return new StringBuffer(line).reverse().toString();
}
@Override
public void lineRead(String line) {
try {
queue.put(line);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void allLinesRead() {
try {
queue.put(POISON_PILL);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
In this implementation we're still using a bounded queue and a poison pill, but this is now completely internal to the consumer and we have the option to change the implementation without touching any other part of the system. The producer and consumer have
no shared information or implementation.
The keen eyed among you may have noticed that the
actions of the producer and consumer are now also decoupled, meaning the producer has no knowledge that input is being read from a file, it can come from anywhere! The LineProducer also has the ability to be used to read
any file, with different listeners attached to do processing.
Another not so obvious benefit of this approach is we can add multiple listeners. If we want to track how many lines per second we were processing, we can just implement a new LineReadListener to handle this logic and add it as a second listener without affecting the line reversing. This gives us a really nice separation of concerns and lets us adhere to the Single Responsibility Principle alot easier.