###############################################################################
#
# HDFSShell.pm -- file functions acting upon a HDFS via the CLI hadoop
# application
#
# A component of the Greenstone digital library software from the New Zealand
# Digital Library Project at the University of Waikato, New Zealand.
#
# Copyright (C) 2013 New Zealand Digital Library Project
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
# Ave, Cambridge, MA 02139, USA.
#
###############################################################################

package FileUtils::HDFSShell;

# Pragma
use strict;

# Configuration
my $debug = 0;

################################################################################
######################### Private Functions & Variables ########################
################################################################################

## @function _executeHDFSCommand()
#
# Executes a HDFS command without caring about the resulting output
# while still reacting appropriately to failed executions.
#
sub _executeHDFSCommand
{
  my $return_result = shift(@_);
  if ($return_result != 0 && $return_result != 1)
  {
    &FileUtils::printError('Unexpected value for return_result argument - should be 0 or 1: ' . $return_result, 1);
  }
  my $command = &_generateHDFSCommand(@_);
  my $result = `$command 2>&1`;
  my $return_value = $?;
  &_printDebug(' -> util::executeHDFSCommand() => |' . $result . '| [' . $return_value . ']');
  # sometimes we may want the actual resulting output returned, for
  # instance when parsing ls
  if ($return_result)
  {
    $return_value = $result;
  }
  return $return_value;
}
## _executeHDFSCommand()


## @function _generateHDFSCommand()
#
sub _generateHDFSCommand
{
  my $action = shift(@_);
  my @args = @_;
  my $arguments = '';
  foreach my $path (@args)
  {
    # Replace the prefix with one HDFS Shell understands
    $path =~ s/HDFSShell:/hdfs:/;
    # special case for standard streams
    if ($path eq '-')
    {
      $arguments .= '- ';
    }
    else
    {
      $arguments .= '"' . $path . '" ';
    }
  }
  my $command = 'hadoop fs -' . $action . ' ' . $arguments;
  &_printDebug(' -> _generateHDFSCommand("' . $action . '", ...) => |' . $command . '|');
  return $command;
}
## _generateHDFSCommand()


## @function _printDebug()
#
sub _printDebug
{
  my ($message) = @_;
  if ($debug)
  {
    print STDERR '[DEBUG] ' . $message . "\n";
  }
}
## _printDebug()


################################################################################
############################### Public Functions ###############################
################################################################################


## @function canRead()
#
sub canRead
{
  my $path = shift(@_);
  # On my Hadoop setups it appears everyone can read everything... pretty sure
  # this won't always be the case but I'm not sure if there is some easy way to
  # determine readability (you'd need to parse the permissions, user, and group
  # and then somehow compare to the current user). So instead I'll just return
  # if the file exists
  return &fileTest($path, '-f');
}
## canRead()


## @function closeFileHandle()
#
sub closeFileHandle
{
  my $fh_ref = shift(@_);
  close($$fh_ref);
  return 1;
}
## closeFileHandle()


## @function fileSize()
#
sub fileSize
{
  my ($path) = @_;
  my $file_stats = &fileStats($path);
  return $file_stats->{'filesize'};
}
## fileSize()


## @function fileStats()
#
sub fileStats
{
  my ($path) = @_;
  my $stats = {};
  my $result = &_executeHDFSCommand(1, 'ls', $path);
  # - parse the results
  if ($result =~ /([ds\-][rwx\-]+)\s+(\d+)\s+([^\s]+)\s+([^\s]+)\s+(\d+)\s+(\d\d\d\d-\d\d-\d\d)\s+(\d\d:\d\d)\s+([^\s]+)$/)
  {
    $stats->{'filename'} = $8;
    $stats->{'replicas'} = $2;
    $stats->{'filesize'} = $5;
    $stats->{'modification_date'} = $6;
    $stats->{'modification_time'} = $7;
    $stats->{'permissions'} = $1;
    $stats->{'userid'} = $3;
    $stats->{'groupid'} = $4;
  }
  else
  {
    &FileUtils::printError('Failed to parse -ls result: ' . $result, 1);
  }
  return $stats;
}
## fileStats()


## @function fileTest()
#
sub fileTest
{
  my ($filename_full_path, $test_op) = @_;
  # Sanity tests
  # Special case: HDFS doesn't support symlinking - swap for -e instead
  if (!defined $test_op || $test_op eq '-l')
  {
    $test_op = '-e';
  }
  my $retval = -1; # cmd return fails > 0 are errors
  # Special case: the easiest way to support -f is to run a -e followed by a -d
  # (which should fail for files)
  if ($test_op eq '-f')
  {
    my $retval1 = &_executeHDFSCommand(0, 'test -e', $filename_full_path);
    if ($retval1 == 0)
    {
      my $retval2 = &_executeHDFSCommand(0, 'test -d', $filename_full_path);
      if ($retval2 > 0)
      {
        $retval = 0;
      }
    }
  }
  # very limited test op support for HDFS
  elsif ($test_op ne '-d' && $test_op ne '-e' && $test_op ne '-z')
  {
    &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
  }
  else
  {
    $retval = &_executeHDFSCommand(0, 'test ' . $test_op, $filename_full_path);
  }
  return ($retval == 0 ? 1 : 0);
}
## fileTest()


## @function filenameConcatenate()
#
sub filenameConcatenate
{
  my $protocol = shift(@_);
  my $filename = join('/', @_);
  # remove repeated slashes
  $filename =~ s/[\/]+/\//g;
  # append protocol (which may cause multiple slashes)
  $filename = $protocol . '/' . $filename;
  # strip any trailing slashes
  $filename =~ s/[\\\/]$//;
  return $filename;
}
## filenameConcatenate()


## @function isFilenameAbsolute()
#
sub isFilenameAbsolute
{
  # File paths against HDFS must be.
  return 1;
}
# isFilenameAbsolute()


## @function isHDFS
#
sub isHDFS
{
  return 1;
}
## isHDFS()


## @function isSpecialDirectory
#
sub isSpecialDirectory
{
    my ($path) = @_;
    return ($path =~ /^HDFSShell:\/\/[a-zA-Z]+:\d+$/);
}
## isSpecialDirectory()


## @function makeDirectory()
#
sub makeDirectory
{
  my ($dir) = @_;
  my $result = &_executeHDFSCommand(0, 'mkdir', $dir);
  # HDFSShell mkdir returns 0 on success, -1 on failure
  return ($result == 0 ? 1 : 0);
}
## makeDirectory()


## @function modificationTime()
#
sub modificationTime
{
  my ($path) = @_;
  &FileUtils::printWarning("modificationTime() not supported");
  my $file_stats = &fileStats($path);
  my $mod_date = $file_stats->{'modification_date'};
  $mod_date =~ /(\d\d\d\d)-(\d\d)-(\d\d)/;
  my $mod_year = $1;
  my $mod_month = $2;
  my $mod_day = $3;
  my $mod_time = $file_stats->{'modification_time'};
  $mod_time =~ /(\d\d):(\d\d)/;
  my $mod_hour = $1;
  my $mod_minute = $2;
  return 0;
}
## modificationTime()


## @function openFileHandle()
#
sub openFileHandle
{
  my ($path, $mode, $fh_ref) = @_;
  if ($mode eq '>>' || $mode eq 'a')
  {
    &FileUtils::printError('Append (>>) mode not supported', 1);
  }
  elsif ($mode eq '>' || $mode eq 'w')
  {
    # the put command fails if the file already exists
    if (&fileTest($path, '-e'))
    {
      &removeFiles($path);
    }
    open($$fh_ref, '| ' . &_generateHDFSCommand('put', '-', $path)) or &FileUtils::printError('Failed to open pipe to HDFS (put) for writing: ' . $path, 1);
  }
  else
  {
    open($$fh_ref, &_generateHDFSCommand('cat', $path) . ' |') or &FileUtils::printError('Failed to open pipe to HDFS (cat) for reading: ' . $path, 1);
  }
  return 1;
}
## openFileHandle()


## @function readDirectory()
#
sub readDirectory
{
  my ($path) = @_;
  my @files;
  my $result = &_executeHDFSCommand(1, 'ls', $path);
  if ($result =~ /No such file or directory/)
  {
      print STDERR "BOOM! BOOM! BOOM!\n";
      return undef;
  }
  my @lines = split(/\r?\n/, $result);
  foreach my $line (@lines)
  {
    if ($line =~ /\/([^\/]+)$/)
    {
      my $file = $1;
      push(@files, $file);
    }
  }
  return \@files;
}
## readDirectory()


## @function removeFiles()
#
sub removeFiles
{
  my ($path, $including_dir) = @_;
  my $result;
  if (defined $including_dir && $including_dir)
  {
    $result = &_executeHDFSCommand(0, 'rmr', $path);
  }
  else
  {
    $result = &_executeHDFSCommand(0, 'rm', $path);
  }
  # HDFSShell mkdir returns 0 on success, -1 on failure
  return ($result == 0 ? 1 : 0);
}
## removeFiles()


## @function removeFilesFiltered()
#
sub removeFilesFiltered
{
  my ($paths, $accept_re, $reject_re) = @_;
  # Perform a depth first, recursive, removal of files and directories that
  # match the given accept and reject patterns
  my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
  my $num_removed = 0;
  foreach my $path (@paths_array)
  {
    # remove trailing slashes
    $path =~ s/[\/\\]+$//;
    if (!&fileTest($path, '-e'))
    {
      &FileUtils::printError('path does not exist: ' . $path);
    }
    elsif (&fileTest($path, '-d'))
    {
      my @files = @{&readDirectory($path)};
      foreach my $file (@files)
      {
        my $child_path = $path . '/' . $file;
        $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
      }
      if (!defined $accept_re && !defined $reject_re)
      {
        # remove this directory
        my $result = &removeFiles($path, 1);
        if ($result != 1)
        {
          &FileUtils::printError('could not remove directory: ' . $path);
        }
        else
        {
          $num_removed++;
        }
      }
    }
    else
    {
      if (defined $reject_re && ($path =~ m/$reject_re/))
      {
        next;
      }
      if ((!defined $accept_re) || ($path =~ m/$accept_re/))
      {
        # remove this file
        my $result = &removeFiles($path);
        if ($result != 1)
        {
          &FileUtils::printError('could not remove file: ' . $path);
        }
        else
        {
          $num_removed++;
        }
      }
    }
  }
  return $num_removed;
}
## removeFilesFiltered()


## @function removeFilesRecursive()
#
sub removeFilesRecursive
{
  my ($path) = @_;
  # use the more general removeFilesFiltered() function with no accept
  # or reject expressions
  return &removeFilesFiltered($path, undef, undef);
}
## removeFilesRecursive()


## @function supportsSymbolicLink
#
sub supportsSymbolicLink
{
  return 0;
}
## supportsSymbolicLink()


## @function transferFile()
#
sub transferFile
{
  my ($mode, $src, $dst) = @_;
  my $result;
  if ($mode eq 'COPY')
  {
    $result = &_executeHDFSCommand(0, 'cp', $src, $dst);
  }
  else
  {
    $result = &_executeHDFSCommand(0, 'mv', $src, $dst);
  }
  # HDFSShell mkdir returns 0 on success, -1 on failure
  return ($result == 0 ? 1 : 0);
}
## transferFile()


## @function transferFileFromLocal()
#
sub transferFileFromLocal
{
  my ($mode, $src, $dst) = @_;
  if (!-f $src)
  {
    &FileUtils::printError('Source file (during ' . $mode . ') doesn\'t exists: ' . $src);
  }
  if (&fileTest($dst, '-d'))
  {
    my ($filename) = $src =~ /([^\\\/]+)$/;
    $dst .= '/' . $filename;
  }
  if (&fileTest($dst, '-f'))
  {
    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
  }
  my $result = &_executeHDFSCommand(0, 'put', $src, $dst);
  my $remove_result = 1;
  if ($mode eq 'MOVE')
  {
    unlink($src);
    # failed to delete somehow
    if (-f $src)
    {
      $remove_result = 0;
    }
  }
  return ($result == 0 && $remove_result ? 1 : 0);
}
## transferFileFromLocal()


## @function transferFileToLocal()
#
sub transferFileToLocal
{
  my ($mode, $src, $dst) = @_;
  if (!&fileTest($src, '-f'))
  {
    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
  }
  if (-d $dst)
  {
    my ($filename) = $src =~ /([^\\\/]+)$/;
    $dst .= '/' . $filename;
  }
  if (-e $dst)
  {
    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
  }
  my $result = &_executeHDFSCommand(0, 'get', $src, $dst);
  my $remove_result = 1;
  if ($mode eq 'MOVE')
  {
    $remove_result = &removeFiles($src);
  }
  return ($result == 0 && $remove_result ? 1 : 0);
}
## transferFileToLocal()


1;
