So I'm trying to reach messages form a file and write them parallel by first putting all the messages with the same id in the same queue, and then having one thread per queue to do the processing. However I seem to be getting an infinite loop for some reason. Not sure exactly why.
public void readWrite() {
BlockingQueue<Msg>[] queues = new LinkedBlockingQueue[15];
// Keeps track of what ID's have been written to what queue
// Index of queue -> Bitset
Map<Integer, Bitset> idBitset = new ConcurrentHashMap<>( 15 );
for (int i = 0; i < 15; i++) {
idBitset.put( i, new Bitset() );
}
AtomicBoolean running = new AtomicBoolean(true);
try (InputStream in = createInputStream()) {
for (int i = 0; i < 15; i++) {
final int index = i;
final AtomicBoolean run = running;
queues[index] = new LinkedBlockingQueue<>();
m_eventExecutors.submit( () -> {
while (run.get() || queues[index].size() != 0) {
Msg msg;
if ( ( msg = queues[index].poll() ) == null ) {
continue;
}
process and write...
}
} );
}
int currentQueue = 0;
Message msg;
while ((msg = in.readMsg()) != null) {
int queue = -1;
for (int i = 0; i < 15; i++) {
if (idBitset.get( i ).get( msg.getId() ) ) {
queue = i;
}
}
if ( queue != -1 ) {
queues[queue].add( msg );
} else {
queues[currentQueue].add( msg );
currentQueue++;
}
if ( currentQueue == 15 ) currentQueue = 0;
}
running.set(false);
} catch (IOException e) {
e.printStackTrace();
}
}