#!/usr/bin/perl
use strict;
use warnings;

# Requires setup.bash to have been sourced
BEGIN
{
  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
}

print "===== Greenstone Import using Hadoop =====\n";

# 0. Init
my $collection = 'test';
my $use_thrift = 0;
my $start_thrift = 0;
my $debug = 0;
my $dry_run = 0;
my $stagger = 0;
my $flush_diskcache = 0;
my $use_nfs = 0;

my $gsdl_home = $ENV{'GSDLHOME'};
my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
my $hadoop_exe = 'hadoop'; # you may add path
my $java_library = 'HadoopGreenstoneIngest2';
my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
my $hdfs_fs_prefix = 'HDThriftFS://';
my $refresh_import = 0;
my $remove_old = 0;
my $username = `whoami`;
chomp($username);
my $gs_results_dir = '';

`rocks > /dev/null 2>&1`;
my $is_rocks_cluster = ($? == 0);

# 1. Read and validate parameters
print 'Options: ' . join(' ', @ARGV) . "\n";
if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
{
  $collection = $ARGV[0];
  print ' collection: ' . $collection . "\n";
}
else
{
  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-enable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger] [-removeold]\n";
  print STDERR "where: [debug] print more debug messages to STDERR\n";
  print STDERR "       [dry_run] don't actually perform an file actions\n";
  exit;
}
my $offset = 1;
while (defined $ARGV[$offset])
{
  if ($ARGV[$offset] eq '-debug')
  {
    $debug = 1;
  }
  if ($ARGV[$offset] eq '-enable_thrift')
  {
    $use_thrift = 1;
  }
  if ($ARGV[$offset] eq '-dry_run')
  {
    $dry_run = 1;
  }
  if ($ARGV[$offset] eq '-refresh_import')
  {
    $refresh_import = 1;
  }
  if ($ARGV[$offset] eq '-stagger')
  {
    $stagger = 1;
  }
  if ($ARGV[$offset] eq '-flush_diskcache')
  {
    $flush_diskcache = 1;
  }
  if ($ARGV[$offset] eq '-start_thrift')
  {
    $start_thrift = 1;
  }
  if ($ARGV[$offset] eq '-use_nfs')
  {
    $use_nfs = 1;
  }
  if ($ARGV[$offset] eq '-removeold')
  {
    $remove_old = 1;
  }
  if ($ARGV[$offset] eq '-logdir')
  {
    $offset++;
    $gs_results_dir = $ARGV[$offset];
  }
  $offset++;
}

if (!$use_thrift)
{
  $hdfs_fs_prefix = 'HDFSShell://';
}
if ($use_nfs)
{
  $hdfs_fs_prefix = '/hdfs';
}

my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
my $gs_import_dir = $gs_collection_dir . '/import';
if (!-d $gs_import_dir)
{
  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
}
if ($gs_results_dir eq '')
{
  $gs_results_dir = $gs_collection_dir . '/results';
  if (!-d $gs_results_dir)
  {
    mkdir($gs_results_dir, 0755);
  }
  $gs_results_dir .= '/' . time();
}
if (!-d $gs_results_dir)
{
  mkdir($gs_results_dir, 0755);
}
# - directories within HDFS
my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
if ($use_nfs)
{
  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
}
my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
if ($use_nfs)
{
  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
}

# 2. Copy the import directory into HDFS
print " * Replicating import directory in HDFS...";
# - check if import directory already exists
my $hdfs_import_exists = 0;
if ($use_nfs)
{
  if (-d $nfs_input_dir)
  {
    $hdfs_import_exists = 1;
  }
}
else
{
  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
}
if ($refresh_import || !$hdfs_import_exists)
{
  # - clear out the old import directory
  if ($hdfs_import_exists)
  {
    if ($use_nfs)
    {
      &recursiveDelete($nfs_input_dir, '/hdfs');
    }
    else
    {
      &hdfsCommand('rmr', $hdfs_input_dir);
    }
  }
  # - now recursively copy the contents of import directory into HDFS ensuring
  #   that relative paths are maintained
  my $file_count = 0;
  if ($use_nfs)
  {
    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
  }
  else
  {
    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
  }
  &debugPrint($file_count . " files 'putted'");
  print "Done!\n";
}
else
{
  print "Already exists!\n";
}

# - clear out the archives regardless
my $gs_archives_dir = $gs_collection_dir . '/archives';
my $deleted_archives = 0;
if (-e $gs_archives_dir)
{
  print " * Clearing existing archives directory for this collection... ";
  &recursiveDelete($gs_archives_dir, $gsdl_home);
  $deleted_archives = 1;
}
mkdir($gs_archives_dir, 0755);
my $hdfs_archives_exists = 0;
if ($use_nfs)
{
  if (-d $nfs_output_dir)
  {
    $hdfs_archives_exists = 1;
  }
}
else
{
  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
}
if ($hdfs_archives_exists)
{
  if (!$deleted_archives)
  {
    print " * Clearing existing archives directory for this collection... ";
  }
  if ($use_nfs)
  {
    &recursiveDelete($nfs_output_dir, '/hdfs');
  }
  else
  {
    &hdfsCommand('rmr', $hdfs_output_dir);
  }
  $deleted_archives = 1;
}
if ($deleted_archives)
{
  print "Done!\n";
}

# - watch for cached directories for Media based collections
my $gs_cached_dir = $gs_collection_dir . '/cached';
if (-e $gs_cached_dir)
{
  print " * Clearing existing cached media directory for this collection... ";
  &recursiveDelete($gs_cached_dir, $gsdl_home);
  print "Done!\n";
}

# - clear out any old logs
print " * Clearing existing logs for this collection... ";
my $gs_logs_dir = $gs_collection_dir . '/logs';
if (!&dirIsEmpty($gs_logs_dir))
{
  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
}
if (!&dirIsEmpty('/tmp/greenstone'))
{
  &shellCommand('rm -f /tmp/greenstone/*.*');
  &shellCommand('rm -rf /tmp/gsimport*');
  &shellCommand('rm -rf /tmp/thrift');
}
if ($is_rocks_cluster)
{
  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
}
print "Done!\n";

# - flush DNS cache too, so we are playing on a level field
if ($flush_diskcache)
{
  print " * Flushing disk cache... ";
  &shellCommand('flush_caches.pl');
  if ($is_rocks_cluster)
  {
    &shellCommand('rocks run host "flush_caches.pl"');
  }
  print "Done!\n";
}

# - If we've been asked to Stagger start-up, add "delay.me" files to the
#   compute nodes
if ($is_rocks_cluster && $stagger)
{
  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
}

# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
#    where we start the server now to ensure it lives on the head node
my $server_host = '';
my $server_port = '';
my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
my $server_prefix = '';
if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
{
  $server_prefix = uc($1);
  print " * Starting " . $server_prefix . "Server... ";
  # - start the server on the head node and retrieve the host and port from
  #   the output
  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
  my $launcher_output = &shellCommand($launcher_command);
  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
  {
    $server_host = $1;
    $server_port = $2;
    print "running on " . $server_host . ":" . $server_port . "\n";
  }
  else
  {
    print "Failed!\n";
    exit;
  }
  # - use the client tool to add ourselves as a listener
  print " * Registering as listener... ";
  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
  &shellCommand($client_command);
  print "Done!\n";
}
elsif ($infodbtype =~ /stdoutxml/)
{
  print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
}
else
{
  print "Error! True Hadoop processing is only available when Greenstone is\n";
  print "       configured to use either GDBMServer or TDBServer.\n";
  exit;
}

# 3.5 Start up the thrift server(s) if we've been asked to
my $thrift_log = $gs_results_dir . '/thriftctl.log';
if ($start_thrift)
{
  if ($is_rocks_cluster)
  {
    print " * Starting Thrift Servers (on compute nodes)... ";
    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
  }
  # single server
  else
  {
    print " * Starting Thrift Server... ";
    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
  }
  print "Done!\n";
}

my $actual_archives_dir;
if ($use_nfs)
{
  $actual_archives_dir = $nfs_output_dir;
}
else
{
  $actual_archives_dir = $hdfs_output_dir;
  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
}

# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
#    and allow the FileInputFormat to split it up into files to be processed
#    in Greenstone. This works for collections with one file per document, like
#    Lorem and ReplayMe, but might not work well with multiple file documents
#    such as the Demo collection
print " * Running import using Hadoop...";
my $hadoop_log = $gs_results_dir . '/hadoop.log';
&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
$hadoop_command .= $collection . ' '; # The collection name
$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
&shellCommand($hadoop_command);
print "Done!\n";

# 5. If we ran *Server infodbs, we now need to shut them down
if ($server_prefix ne '')
{
  print " * Deregistering as listener and shutting down... ";
  # - deregister as a listener
  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
  &shellCommand($client_command1);
  # - send quit command
  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
  &shellCommand($client_command2);
  print "Done!\n";
}

# 5.5 We started them - so we better stop the thrift servers too
# 3.5 Start up the thrift server(s) if we've been asked to
if ($start_thrift)
{
  if ($is_rocks_cluster)
  {
    print " * Stopping Thrift Servers (on compute nodes)... ";
    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
  }
  # single server
  else
  {
    print " * Stoping Thrift Server... ";
    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
  }
  print "Done!\n";
}

# 6. Gather logs
print " * Gathering logs from compute nodes... ";
# - local files
if (!&dirIsEmpty('/tmp/greenstone'))
{
  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
}
if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
{
  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
}
if ($start_thrift && -d '/tmp/thrift')
{
  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
}
# - remote files
if ($is_rocks_cluster)
{
  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
  if ($start_thrift)
  {
    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
  }
}
print "Done!\n";

# - generate data locality report...
if (!$use_nfs && !$use_thrift)
{
  &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
}

# - hadoop report...
&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');

# - and gantt chart
&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');

# 7. Done - clean up
print " * Cleaning up temporary files... ";
&shellCommand('rm -rf /tmp/greenstone');
&shellCommand('rm -rf /tmp/gsimport*');
if ($is_rocks_cluster)
{
  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
}
print "Done!\n";
print "Complete!\n\n";

exit;

# /** @function debugPrint
#  */
sub debugPrint
{
  my $msg = shift(@_);
  if ($debug)
  {
    print "[Debug] " . $msg . "\n";
  }
}
# /** debugPrint() **/

# /** @function hdfsCommand
#  */
sub hdfsCommand
{
  my $command = shift(@_);
  my $paths = '"' . join('" "', @_) . '"';
  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
  &shellCommand($hdfs_command);
  return $?;
}
# /** hdfsCommand() **/

# /** @function hdfsTest
#  */
sub hdfsTest
{
  my $command = shift(@_);
  my $test_target = shift(@_);
  my $result = &hdfsCommand('test -' . $command, @_);
  return ($result == $test_target);
}
# /** hdfsTest() **/

# /**
#  */
sub printUsage
{
  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
  exit;
}
# /** printUsage() **/


## @function recursiveCopy()
#
sub recursiveCopy
{
  my ($src_dir, $hdfs_dir) = @_;
  my $file_count = 0;
  # - create the directory in HDFS
  if ($use_nfs)
  {
    &shellCommand('mkdir "' . $hdfs_dir . '"');
  }
  else
  {
    &hdfsCommand('mkdir', $hdfs_dir);
  }
  # - search $src_dir for files
  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
  my @files = readdir(DH);
  closedir(DH);
  foreach my $file (@files)
  {
    # - skip dot prefix files
    if ($file !~ /^\./)
    {
      my $src_path = $src_dir . '/' . $file;
      # - recurse directories, remembering to extend HDFS dir too
      if (-d $src_path)
      {
        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
      }
      # - and use 'put' to copy files
      else
      {
        my $hdfs_path = $hdfs_dir . '/' . $file;
        if ($use_nfs)
        {
          &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
        }
        else
        {
          &hdfsCommand('put', $src_path, $hdfs_path);
        }
        $file_count++;
      }
    }
  }
  return $file_count;
}
## recursiveCopy() ##


# /** @function shellCommand
#  */
sub shellCommand
{
  my $cmd = shift(@_);
  my $output = '';
  &debugPrint($cmd);
  if (!$dry_run)
  {
    $output = `$cmd`;
  }
  return $output;
}
# /** shellCommand() **/

# /** @function urlCat
#  */
sub urlCat
{
  my $url = join('/', @_);
  return $url;
}
# /** urlCat() **/

# /**
#  */
sub dirIsEmpty
{
  my $dir = shift(@_);
  my @files;
  if (-e $dir)
  {
    opendir(DIR, $dir) or die $!;
    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
    closedir(DIR);
  }
  @files ? 0 : 1;
}
# /** dirIsEmpty() **/


## @function recursiveDelete()
#
sub recursiveDelete
{
  my ($dir, $prefix) = @_;
  if ($dir =~ /^$prefix/)
  {
    &shellCommand('rm -rf "' . $dir . '"');
  }
}
## recursiveDelete() ##
