###############################################################################
#
# parallelbuildinginexport.pm -- support parallel import.pl by extending
# inexport with parallel processing awareness and functionality
#
# A component of the Greenstone digital library software from the New Zealand
# Digital Library Project at the University of Waikato, New Zealand.
#
# Copyright (C) 2013 New Zealand Digital Library Project
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the  GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
# Ave, Cambridge, MA 02139, USA.
#
###############################################################################

# @author Hussein Suleman [h1], University of Cape Town, South Africa
# @author John Thompson [jmt12], Greenstone DL Research Group

package parallelbuildinginexport;

# Pragma
use strict;
no strict 'refs'; # allow filehandles to be variables and vice versa
no strict 'subs'; # allow barewords (eg STDERR) as function arguments

# MODULES
# Randomize the order of files in the filelist
use List::Util qw( shuffle );
use Time::HiRes  qw( gettimeofday tv_interval );

# Greenstone Modules
use gsprintf 'gsprintf';
use inexport;

our $start_time;
our $filelist_duration;

BEGIN
{
  @parallelbuildinginexport::ISA = ('inexport');
}

END
{
  if (defined $start_time)
  {
    my $end_time = [&gettimeofday()];
    my $duration = tv_interval($start_time, $end_time);
    print &makeHeader('Parallel Import Complete') . "\n";
    print '  Ended:    ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
    if (defined $filelist_duration)
    {
      print '  Generating raw manifest: ' . sprintf('%0.6f', $filelist_duration) . "\n";
    }
    print '  Duration: ' . sprintf('%0.6f', $duration) . "\n";
    print '=' x 80 . "\n";
  }
}

# jobs and epoch added for parallel processing [hs, 1 july 2010]
# added aliases 'workers' and 'batchsize' [jmt12]
my $arguments = [
                 { 'name' => "workers",
                   'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
                   'type' => "int",
                   'range' => "0,",
                   'reqd' => "no",
                   'hiddengli' => "yes" },
                 { 'name' => "batchsize",
                   'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
                   'type' => "int",
                   'range' => "1,",
                   'reqd' => "no",
                   'hiddengli' => "yes" },
                 { 'name' => "nolocal",
                   'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
                   'type' => "flag",
                   'reqd' => "no",
                   'hiddengli' => "yes" },
                 { 'name' => "jobs",
                   'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
                   'type' => "int",
                   'range' => "1,",
                   'reqd' => "no",
                   'internal' => "1",
                   'hiddengli' => "yes" },
                 { 'name' => "epoch",
                   'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
                   'type' => "int",
                   'range' => "1,",
                   'reqd' => "no",
                   'internal' => "1",
                   'hiddengli' => "yes" },
                ];


# @function new()
# Constructor
#
sub new
{
  my $class = shift(@_);
  my $self = new inexport(@_);

  $start_time = [&gettimeofday()];

  # Legacy support - Dr Suleman initially had different names for these
  # arguments, namely jobs and epoch
  if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
  {
    $self->{'workers'} = $self->{'jobs'} - 1;
  }
  if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
  {
    $self->{'batchsize'} = $self->{'epoch'};
  }

  # Sanity Check
  if ($self->{'batchsize'} !~ /^\d+$/)
  {
    print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
    $self->{'batchsize'} = 1;
  }
  if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
  {
    print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
    $self->{'workers'} = 0;
  }
  else
  {
    my $message = 'Performing Parallel Import';
    print &makeHeader($message) . "\n";
    print '  Started:   ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
    print '  Workers:   ' . $self->{'workers'} . "\n";
    print '  Batchsize: ' . $self->{'batchsize'} . "\n";
    print '=' x 80 . "\n";
    if (!$self->{'removeold'})
    {
      print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
      sleep(3); #just in case
      $self->{'removeold'} = 1;
    }
  }

  return bless($self, $class);
}
# new()


## @function
#
sub set_collection_options
{
  my $self = shift(@_);
  my ($collectcfg) = @_;
  $self->SUPER::set_collection_options($collectcfg);
  $self->{'collectcfg'} = $collectcfg;
}
## set_collection_options() ##


## @function deinit()


# @function _farmOutProcesses()
# Index the files in parallel using MPI farmer to farm off multiple processes
# @author hs, 1 july 2010
#
sub _farmOutProcesses
{
  my $self = shift(@_);
  my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;

  my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
  if (!-d $tmp_dir_path)
  {
    mkdir($tmp_dir_path, 0777);
  }

  # create the list of files to import
  my $filelist_start_time = [&gettimeofday()];
  my $overwrite = 1;
  my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
  # - if the file is already there (which is should be during testing) then
  #   don't regenerate. This is especially important for imports of 1 million
  #   documents as just the directory scan can take several hours.
  if ($overwrite || !-f $tmp_filelist)
  {
    open (my $filelist, ">$tmp_filelist");
    my @filenames = keys %{$block_hash->{'all_files'}};
    @filenames = shuffle(@filenames);
    foreach my $filename (@filenames)
    {
      my $full_filename = &util::filename_cat($importdir,$filename);
      if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
      {
        print $filelist "$filename\n";
      }
    }
    close ($filelist);
  }
  my $filelist_end_time = [&gettimeofday()];
  $filelist_duration = tv_interval($filelist_start_time, $filelist_end_time);

  # Determine if we've been provided a mpi.conf file to indicate the other
  # machines (slave nodes) this parallizable process should run on
  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
  my $mpi_flags = '';
  if (-f $mpi_conf_path)
  {
    print STDERR " ***** CLUSTER MODE *****\n";
    $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
    $mpi_flags .= '-nolocal ';
  }
  else
  {
    print STDERR " ***** SINGLE COMPUTER MODE *****\n";
  }
  $mpi_flags .= ' --show-progress --timestamp-output --verbose';
  # fix for mpi binding to incorrect interface device (seems to have an
  # unhealthy obsession with virbr0)
  $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 ';

  # invoke the farmer to start processing the files
  my $gsdlhome;
  if (defined $site && $site ne '')
  {
    $gsdlhome = $ENV{'GSDL3SRCHOME'};
  }
  else
  {
    $site = "";
    $gsdlhome = $ENV{'GSDLHOME'};
  }

  # commands now assume path is correct to find this executables (as they
  # will be under the new extensions framework)
  my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';

  my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
  print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";

  open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
  while ( defined( my $line = <MPI> )  )
  {
    chomp($line);
    print "$line\n";
  }
  close(MPI);
}
# _farmOutProcesses()


## @function getSupportedArguments()
#
# Retrieve the list of arguments that are specific to this subclass of inexport
# so they can be added to the list of supported arguments to import.pl. The use
# of any of these arguments automatically causes this subclass to be
# instantiated and used in preference to the parent class. ATM it is up to the
# implementer to ensure these arguments are unique.
#
sub getSupportedArguments
{
  return $arguments;
}
## getSupportedArguments() ##


################################################################################
##### Overrides
################################################################################


## @function perform_process_files()
#
sub perform_process_files
{
  my $self = shift(@_);
  my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
  my $batchsize = $self->{'batchsize'};
  my $collection = $self->{'collection'};
  my $site = $self->{'site'};
  my $workers = $self->{'workers'};

  # Parallel Import
  if ($workers > 0)
  {
    # Call the function written by HS
    $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
  }
  # Serial import
  else
  {
    $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
  }

  # the individual parts into one single database
  my $infodb_type = $self->{'collectcfg'}->{'infodbtype'};
  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
  if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) )
  {
    print STDERR ' * Merging ' . $infodb_type . ' databases... ';
    my @databases = ('archiveinf-src','archiveinf-doc');
    foreach my $database (@databases)
    {
      # generate the path to the target database without any hostname suffix
      my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, '');
      #rint STDERR " - merging to: " . $archive_db_path . "\n";
      open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading");
      my $line;
      while ($line = <MPIHOSTS>)
      {
        if ($line =~ /^([a-z0-9\-]+)/i)
        {
          my $hostname = $1;
          ###rint STDERR " - searching for database for: " . $hostname . "\n";
          my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname);
          ###rint STDERR "[debug] " . $mergable_db_path . "\n";
          if (-f $mergable_db_path)
          {
            ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n";
            &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path);
          }
        }
      }
      close(MPIHOSTS);
    }
    print "Done!\n";
  }

}
## perform_process_files() ##

## @function generate_statistics()
#
# Write out import stats - only output statistics if there are multiple jobs
# @author hs, 1 july 2010
#
sub generate_statistics
{
  my $self = shift @_;
  my ($pluginfo) = @_;

  my $inexport_mode = $self->{'mode'};

  my $statsfile   = $self->{'statsfile'};
  my $out         = $self->{'out'};
  my $faillogname = $self->{'faillogname'};
  my $gli         = $self->{'gli'};

  my $close_stats = 0;
  # Child processes should have been sent to file, only the parent process
  # should be writing to STD*.
  if ($statsfile !~ /^(STDERR|STDOUT)$/i)
  {
    if (open (STATS, ">$statsfile"))
    {
      $statsfile = 'inexport::STATS';
      $close_stats = 1;
    }
    else
    {
      &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
      &gsprintf($out, "{import.stats_backup}\n");
      $statsfile = 'STDERR';
    }
  }
  # Master thread. In the future I should do something smarter here, like read
  # in the stats from all the worker threads stat files, add them up, then
  # create a dummy plugin object and add it to pluginfo. Otherwise the results
  # always show 0 documents considered and 0 included...
  else
  {

  }

  &gsprintf($out, "\n");
  &gsprintf($out, "*********************************************\n");
  &gsprintf($out, "{$inexport_mode.complete}\n");
  &gsprintf($out, "*********************************************\n");

  # ... but for now just comment this out
  #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
  &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");

  if ($close_stats)
  {
    close STATS;
  }
}
## generate_statistics() ##


## @function makeHeader($msg, [$length])
#
# Create a centered header string given a certain message padded with '=' characters.
#
# @param $msg The message to center as a string
# @param $length The desired length of string - defaults to 80
# @return A string centered with '=' as padding
#
sub makeHeader
{
  my ($msg, $length) = @_;
  if (!defined $length)
  {
    $length = 80;
  }
  my $filler_length = ($length - 2 - length($msg)) / 2;
  my $filler = '=' x $filler_length;
  if (length($msg) % 2 == 1)
  {
    $msg = $filler . ' ' . $msg . ' =' . $filler;
  }
  else
  {
    $msg = $filler . ' ' . $msg . ' ' . $filler;
  }
  return $msg;
}
## makeHeader() ##
1;
