#Writing in parallel to OutputStream

3 messages · Page 1 of 1 (latest)

vital smelt
#

So I'm trying to reach messages form a file and write them parallel by first putting all the messages with the same id in the same queue, and then having one thread per queue to do the processing. However I seem to be getting an infinite loop for some reason. Not sure exactly why.

public void readWrite() {
    BlockingQueue<Msg>[] queues = new LinkedBlockingQueue[15];

    // Keeps track of what ID's have been written to what queue
    // Index of queue -> Bitset
    Map<Integer, Bitset> idBitset = new ConcurrentHashMap<>( 15 );
    for (int i = 0; i < 15; i++) {
        idBitset.put( i, new Bitset() );
    }

    AtomicBoolean running = new AtomicBoolean(true);

    try (InputStream in = createInputStream()) {
        for (int i = 0; i < 15; i++) {
            final int index = i;
            final AtomicBoolean run = running;
            queues[index] = new LinkedBlockingQueue<>();
            m_eventExecutors.submit( () -> {
                while (run.get() || queues[index].size() != 0) {
                    Msg msg;
                    if ( ( msg = queues[index].poll() ) == null ) {
                        continue;
                    }
                    process and write...
                }
            } );
          
        }
        int currentQueue = 0;
        Message msg;
        while ((msg = in.readMsg()) != null) {
            int queue = -1;

            for (int i = 0; i < 15; i++) {
                if (idBitset.get( i ).get( msg.getId() ) ) {
                    queue = i;
                }
            }

            if ( queue != -1 ) {
                queues[queue].add( msg );
            } else {
                queues[currentQueue].add( msg );
                currentQueue++;
            }

            if ( currentQueue == 15 ) currentQueue = 0;
        }
        running.set(false);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
iron horizonBOT
#

This post has been reserved for your question.

Hey @vital smelt! Please use /close or the Close Post button above when you're finished. Please remember to follow the help guidelines. This post will be automatically closed after 300 minutes of inactivity.

TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here.