# @author Donnie Cameron (macnod)
# @url    https://github.com/macnod/DcServer
# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool

# One lurking issue with the ThreadPoolServer is that several threads don't get
# a chance to return before the server shuts down. In a perfect world they'd
# all get a chance to notice the 'stop' flag has been set and thusly
# stop. However, it is only by chance this happens as most will spend most of
# their lives either blocked listening to a socket (accept_requests) or blocked
# waiting to dequeue a message (request_handlers). Possible solutions, such as
# interupts, have problems of their own - interrupting mid-read of a socket
# could be bad.  Since this is non-fatal, and non-trivial to fix, I'm just
# leaving it for now.

package SocketsSwimmingThreadPoolServer;

use lib '.';
use threads;
use threads::shared;
use Thread::Queue;
use IO::Socket;
use Time::HiRes qw/sleep/;
use strict;
use warnings;

my $stop :shared;
my $accept_queue = Thread::Queue->new;
my $closed_queue = Thread::Queue->new;

sub new {
  # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
  # done_cb, processor_cb
  my ($proto, %param)= @_;
  my $class= ref($proto) || $proto;
  bless +{
    socket_defaults => +{
      LocalHost => $param{host} || 'localhost',
      LocalPort => $param{port} || 8191},
    thread_count => $param{thread_count} || 10,
    main_yield => $param{main_yield} || 1,
    main_cb => $param{main_cb} || sub {},
    done_cb => $param{done_cb} || sub {},
    processor_cb => $param{processor_cb} || \&processor,
    eom_marker => $param{eom_marker} || "\\n\\.\\n",
    thread_pool => undef,
    listen_queue => 5
  } => $class;
}

# This callback (for processor_cb) simply explains no other processor function
# defined.
sub processor
{
  my ($data, $ip, $tid, $fnstop)= @_;
  return "[tid=$tid; ip=$ip] No function implemented";
}

sub start
{
  my $self= shift;

  # Start a thread to dispatch incoming requests
  threads->create(sub {$self->accept_requests})->detach;

  # Start the thread pool to handle dispatched requests
  for (1 .. $self->{thread_count})
  {
    threads->create(sub {$self->request_handler})->detach;
  }

  # Start a loop for performing tasks in the background, while
  # handling requests
  $self->main_loop;

  $self->{done_cb}->();
}

sub stop
{
  my $self = shift;
  $stop= 1;
}

sub main_loop
{
  my $self = shift;
  my $counter = 1;
  until($stop)
  {
    $self->{main_cb}->($counter++, sub {$self->stop});
    sleep $self->{main_yield};
  }
}

sub accept_requests
{
  my $self = shift;
  my %socket;
  # lsocket => listerner [sic] socket
  my $lsocket;
  while (!defined $lsocket || !$lsocket)
  {
    $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
    if (!defined $lsocket || !$lsocket)
    {
      print STDERR "Warning! Can't create listener socket so server can't start (trying again in 1 second).\n";
      print STDERR "Error message: \"" . $! . "\"\n\n";
      sleep(1);
    }
  }
  until($stop)
  {
    # csocket => connected socket (for doing the actual communications)
    my $csocket = $lsocket->accept;
    my $n = fileno $csocket;
    $socket{$n} = $csocket;
    # add this to the queue that a virtual horde of request_handlers are
    # waiting upon (much like vultures)
    $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
    # we then go ahead and destroy any sockets that we are no longer using.
    # Note the use on a non-blocking dequeue (so it returns straight away) and
    # that $n may be undefined (if the queue is empty) skipping the while loop
    # altogether
    while($n = $closed_queue->dequeue_nb)
    {
      # note: 2 = finished both reading and writing to this socket
      $socket{$n}->shutdown(2);
      delete $socket{$n}
    }
  }
  # note: 2 = finished both reading and writing to this socket
  $lsocket->shutdown(2);
}

# @function request_handler()
#
# There are *thread_count* request_handler threads all sitting, waiting, to
# process any incoming requests received via the socket. Note that the whole
# 'race for data' issue is being handled by the Thread::Queue->dequeue command
# which is, according to the documentation, thread safe. I presume the actual
# dequeuing is somehow synchronized.
sub request_handler
{
  my $self= shift;
  my ($n, $ip, $data);
  my ($receive_time, $process_time, $send_time);
  until($stop)
  {
    # note: dequeue is blocking so this thread will wait here until something
    # is enqueued
    ($n, $ip)= split / /, $accept_queue->dequeue();
    next unless $n;
    # open socket for reading and writing... reading first of course
    open my $socket, '+<&=' . $n or die $!;
    # this will read in the payload from the socket until the end of message
    # sentinel value is detected. Note that this is done in a loop, so the
    # client side of the socket may send multiple requests. Eventually the
    # client will close their end of the socket causing the data to be
    # undefined (maybe - I don't quite understand the defining construct - see
    # below) and breaking out of the loop.
    if(defined($data = $self->receive_client_request($socket)))
    {
      # we then pass the payload to the registered processor function (provided
      # by the caller) to do the actual legwork, the result of which is written
      # back to the socket.
      my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
      print $socket $result, "\n.\n";
    }
    # this is where the socket is closed... so why am I eventually hitting a
    # "cannot assign address" problem?
    close $socket;
    # add this socket number to the list of sockets to be destroyed
    $closed_queue->enqueue($n);
  }
}

sub receive_client_request
{
  my ($self, $socket)= @_;
  # how does this construct work? eom_marker is a scalar (a string) so how can
  # it be assigned to an anonymous array?
  my ($eom, $buffer, $data)= $self->{eom_marker};
  while ($buffer = <$socket>)
  {
    $data .= $buffer;
    last if $data =~ s/$eom$//;
  }
  return $data;
}

1;
