NAME

    Consumer::NonBlock - Send data between processes without blocking.

DESCRIPTION

    It is very easy to end up in a situation where a producer process
    produces data faster than a consumer process can read/process it
    resulting in the producer blocking on a full pipe buffer. This module
    allows 2 processes to share data similar to a pipe, but without the
    producer blocking due to full pipe buffers.

    A pipe is better in most situations, this is only useful if the
    producer needs to do many things and you cannot afford to block on a
    consumer. This is used by App::Yath to send data to a comparatively
    slow database upload process without blocking.

SYNOPSIS

        use Consumer::NonBlock;
    
        my ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100);
    
        $writer->write_line("A line!");
        $writer->write("several\nlines\n", "in\nseveral\nstrings\n");
    
        my $line1 = $reader->read_line();
        # "A line!"
    
        my @lines = $reader->read_lines();
        # "several"
        # "lines"
        # "in"
        # "several"
        # "strings"
    
        $writer = undef; # Close the output
        $reader = undef; # Close the input, and delete the temp dir

SYNOPSYS WITH FORK

    Normally when the reader is closed it will delete the data. Normally
    when the writer is closed it will mark the data stream as complete. If
    you fork then either of these actions can happen in either process.

    The weaken() method can be used to prevent a reader or writer from
    taking these actions. So in the producer process you want to weaken the
    reader object, and in the consumer process you want to weaken the
    writer object.

        use Consumer::NonBlock;
    
        my ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100);
    
        my $pid = fork // die "Could not fork: $!";
    
        if ($pid) { # Parent
            # Make sure this process does not delete the temp data
            $reader->weaken;
            $reader->close;
    
            $writer->write_line("Line from the parent");
        }
        else { # Child
            # Make sure this process does not mark the IO as complete
            $writer->weaken;
            $writer->close;
    
            my $line = $reader->read_line;
            print "Got line: $line\n";
        }

IMPLEMENTATION DETAILS

    This module works by having the producer write to temporary files. It
    will rotate files after a specified batch limit. The consumer will
    delete the files as it finishes with them to prevent having data we
    already processed sitting on disk. For best performance /var/shm should
    be used.

METHODS

    ($reader, $writer) = Consumer::NonBlock->pair()

    ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100,
    use_shm => $BOOL)

      Create a reader and writer pair.

      Optionally specify a batch size, default is 100.

      Optionally request that data be stored in /var/shm.

    $writer = Consumer::NonBlock->writer($DIR)

    $writer = Consumer::NonBlock->writer($DIR, batch_size => 100)

      Create a writer in the specified directory.

    $reader = Consumer::NonBlock->reader($DIR)

      Create a reader on a specified directory.

    $reader = Consumer::NonBlock->reader_from_env()

      Create a reader using the $ENV{CONSUMER_NONBLOCK_DIR} env var.

    $handle = Consumer::NonBlock->new(dir => $DIR, is_reader => $BOOL,
    is_writer => $BOOL, batch_size => $INT)

      Not recommended, use pair(), reader(), or writer().

    $dir = $handle->dir()

      Get the temporary data directory.

    $batch_size = $handle->batch_size()

      Get the batch size.

    $bool = $handle->use_shm()

      Check if /var/shm is being used.

    $bool = $handle->is_reader()

      Check if handle is a reader.

    $bool = $handle->is_writer()

      Check if handle is a writer.

    $bool = $handle->is_weak()

      Check if the handle has been weakened.

    $handle->weaken()

      Weaken the handle so it will not delete the data dir or close the IO.

    $handle->set_env_var()

      Sets the $ENV{CONSUMER_NONBLOCK_DIR} env var to the temporary data
      dir. This can then be used in a child process with
      Consumer::NonBlock->reader_from_env() to create a reader instance in
      another process.

    $line_count = $handle->write($text1, $text2, ...)

      Write arbitrary text data with arbitrary line breaks. Line breaks
      WILL be added between arguments.

      Returns number of lines written.

    $handle->write_line($line)

      Write a single line of data. No validation is done to verify the
      input line. Having line-breaks inside the input line WILLcorrupt
      data.

    $line = $handle->read_line()

      Retrieve a single line from the handle. Will block until data is
      available or until the writer is closed.

    @lines = $handle->read_lines()

      Retrieve all lines, will block until writer is closed.

    $handle->close

      Will close the handle. If this is a writer it will close the stream.
      If this is a reader it will delete the data dir. $handle will be set
      to undef.

SOURCE

    The source code repository for Consumer-NonBlock can be found at
    http://github.com/exodist/Consumer-NonBlock/.

MAINTAINERS

    Chad Granum <exodist@cpan.org>

AUTHORS

    Chad Granum <exodist@cpan.org>

COPYRIGHT

    Copyright Chad Granum <exodist7@gmail.com>.

    This program is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.

    See http://dev.perl.org/licenses/