################################################################################
#
# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
#
# A component of the Greenstone digital library software from the New Zealand
# Digital Library Project at the University of Waikato, New Zealand.
#
# Copyright (C) 2013 New Zealand Digital Library Project
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
# Ave, Cambridge, MA 02139, USA.
#
###############################################################################

# Thrift acts as client-server 'relay' between the Perl code and the HDFS.  It
# allows for persistant connections and so is significantly faster than
# repeatedly starting Hadoop's Java application over and over. In order to
# connect to the Thrift server this code needs to know the host and port the
# server may be found on - information currently hard-coded near the top of the
# script. There are also a number of Perl module API 'bindings' generated by
# the Thrift compilation process... currently located within the packages of
# the Parallel Processing extension.  Note that I make use of some tie() magic
# so as to provide calling code with 'file handle'-like objects to interact
# with (print, readline etc), so that is pretty cool.

package FileUtils::HDThriftFS;

# Pragma
use strict;

# Setup Environment
BEGIN
{
  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
  die "GSDLCOLLECTDIR not set\n" unless defined $ENV{'GSDLCOLLECTDIR'};
  # We need the Perl version before continuing
  if (!defined $ENV{'PERL_VERSION'})
  {
    $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
  }
  die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
  # Bit::Vector and Thrift modules
  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
  # ThriftFS Perl API
  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
}

# Modules - Core
use Devel::Peek;
use MIME::Base64;
use POSIX qw(floor);
use Symbol;
use Thrift::Socket;
use Thrift::BufferedTransport;
use Thrift::BinaryProtocol;
# Modules - Thrift
use HadoopFS::FileSystem;
# Modules - Greenstone
use FileUtils::HDThriftFS::ThriftFH;
use MIME::Base91;

# Configuration
my $host = (`hostname -s` || `hostname -a` || `hostname` || $ENV{'HOSTNAME'} || 'localhost');
chomp($host);
my $port = 58660;
my $debug = 0;
my $debug_encoding = 0;
# Testing shows 128k is pretty optimal
#my $buffer_length =    4 * 1024; # 4k blocks
#my $buffer_length =    8 * 1024; # 8k blocks
#my $buffer_length =   16 * 1024; # 16K blocks
#my $buffer_length =   32 * 1024; # 32k blocks
#my $buffer_length =   64 * 1024; # 64k blocks
my $buffer_length =  128 * 1024; # 128k blocks
#my $buffer_length =  256 * 1024; # 256k blocks
#my $buffer_length =  512 * 1024; # 512k blocks
#my $buffer_length = 1024 * 1024; # 1M blocks
#my $buffer_length = 2048 * 1024; # 2M blocks
## These cause "OUT OF MEMORY" errors on Medusa
#my $buffer_length = 4096 * 1024; # 4M blocks
#my $buffer_length = 8192 * 1024; # 8M blocks

# Globals
my $transport;
my $thrift_client;


## @function END()
#
# Ensure the transport layer, if open, is properly closed
#
END
{
  if (defined $transport)
  {
    $transport->close();
  }
}
## END()


## @function _establishClient()
#
sub _establishClient
{
  if (!defined $thrift_client)
  {
    # Look for a configuration file to override the default localhost:58660
    # settings
    my $conf_file_path = &FileUtils::filenameConcatenate($ENV{'GSDLCOLLECTDIR'}, 'etc', 'thrift.conf');
    if (&FileUtils::fileExists($conf_file_path))
    {
      print " * Found Thrift configuration file:\n";
      my $conf_raw = &FileUtils::fileGetContents($conf_file_path);
      if ($conf_raw =~ /^([^:]*):(\d+)/)
      {
        my $new_host = $1;
        $port = $2;
        if ($new_host ne '' && $new_host ne 'localhost')
        {
          $host = $new_host;
          print " - Host: " . $host . "\n";
        }
        print " - Port: " . $port . "\n";
      }
    }

    print " * Creating Thrift client connected to: $host:$port\n";
    my $socket = Thrift::Socket->new($host, $port);
    $socket->setSendTimeout(10000);
    $socket->setRecvTimeout(20000);

    $transport = Thrift::BufferedTransport->new($socket);
    my $protocol = Thrift::BinaryProtocol->new($transport);
    $thrift_client = HadoopFS::FileSystemClient->new($protocol);

    eval { $transport->open(); };
    if ($@)
    {
      &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
    }
  }
}
## _establishClient()


## @function _generateHDFSPath()
#
sub _generateHDFSPath
{
  my ($path) = @_;
  if (ref($path) ne 'HadoopFS::Pathname')
  {
    if ($path !~ /HDThriftFS:\/\//)
    {
      &FileUtils::printError('Not a valid thrift URI: ' . $path);
    }
    else
    {
      # Remove protocol and any host and port information
      $path =~ s/HDThriftFS:\/\/[^\/]*//;
      $path = HadoopFS::Pathname->new( { pathname => $path } );
    }
  }
  return $path;
}
## _generateHDFSPath()


## @function _printDebug()
#
sub _printDebug
{
  my ($msg) = @_;
  if ($debug)
  {
    my ($package, $filename, $line, $function) = caller(1);
    print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
  }
}
## _printDebug()

################################################################################
################################################################################
################################################################################


## @function canRead()
#
sub canRead
{
  my $path = shift(@_);
  return &checkPermission($path, 'r', @_);
}
## canRead()


## @function checkPermission()
#
sub checkPermission
{
  my ($path, $mode, $username, $usergroup) = @_;
  my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
  # - ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object
  $path = &_generateHDFSPath($path);
  # - determine the user (defaults to current user)
  if (!defined $username)
  {
    if ($ENV{'GSDLOS'} =~ /^windows$/i)
    {
      require Win32;
      $username = Win32::LoginName();
    }
    else
    {
      $username = getlogin || getpwuid($<);
    }
  }
  # - determine the group
  my $usergroups = {};
  if (defined $usergroup)
  {
    $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
  }
  else
  {
    if ($ENV{'GSDLOS'} =~ /^windows$/i)
    {
      # dunno
    }
    else
    {
      my $raw_groups = `groups`;
      foreach my $group ( split(/\s/, $raw_groups))
      {
        $usergroups->{$group} = 1;
      }
    }
  }
  # Retrieve details from the file
  my $file_stat = $thrift_client->stat($path);
  my $owner = $file_stat->{'owner'};
  my $group = $file_stat->{'group'};
  my $permissions = $file_stat->{'permission'};
  # Begin the cascade of tests to determine if the identified user belonging to
  # the identified group can perform 'mode' access to the file.
  my $has_permission = 0;
  # - start with [u]ser permission
  if (defined $owner && $username eq $owner)
  {
    my $target_char = substr($permissions, $offsets->{$mode}, 1);
    if ($mode eq $target_char)
    {
      $has_permission = 1;
    }
  }
  # - failing that, try [g]roup level permissions
  if (!$has_permission && defined $group && defined $usergroups->{$group})
  {
    my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
    if ($mode eq $target_char)
    {
      $has_permission = 1;
    }
  }
  # - and finally try [o]ther level permission
  if (!$has_permission)
  {
    my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
    if ($mode eq $target_char)
    {
      $has_permission = 1;
    }
  }
  return $has_permission;
}
## checkPermission


## @function closeFileHandle()
#
sub closeFileHandle
{
  my $fh_ref = shift(@_);
  &_printDebug('');
  close($$fh_ref);
  untie($$fh_ref);
  return 1;
}
## closeFileHandle()


## @function fileSize()
#
sub fileSize
{
  my ($path, $test_op) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object
  $path = &_generateHDFSPath($path);
  my $file_stat = $thrift_client->stat($path);
  return $file_stat->{length};
}
## fileSize()


## @function fileTest()
#
sub fileTest
{
  my ($raw_path, $test_op) = @_;
  my $result = 0;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object
  my $path = &_generateHDFSPath($raw_path);
  # note: symbolic linking not supported within HDFS
  if (!defined $test_op || $test_op eq '-l')
  {
    $test_op = '-e';
  }
  if ($test_op eq '-d')
  {
    if ($thrift_client->exists($path))
    {
      my $file = $thrift_client->stat($path);
      if ($file->{'isdir'})
      {
        $result = 1;
      }
    }
  }
  elsif ($test_op eq '-e')
  {
    if ($thrift_client->exists($path))
    {
      $result = 1;
    }
  }
  elsif ($test_op eq '-f')
  {
    if ($thrift_client->exists($path))
    {
      my $file = $thrift_client->stat($path);
      if (!$file->{'isdir'})
      {
        $result = 1;
      }
    }
  }
  else
  {
    &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
  }
  return $result;
}
## fileTest()


## @function filenameConcatenate()
#
sub filenameConcatenate
{
  my $protocol = shift(@_);
  my $filename = join('/', @_);
  # remove repeated slashes
  $filename =~ s/[\/]+/\//g;
  # append protocol (which may cause multiple slashes)
  $filename = $protocol . '/' . $filename;
  # strip any trailing slashes
  $filename =~ s/[\\\/]$//;
  return $filename;
}
## filenameConcatenate()


## @function isFilenameAbsolute()
#
sub isFilenameAbsolute
{
  # File paths against HDFS must be.
  return 1;
}
# isFilenameAbsolute()


## @function isHDFS
#
sub isHDFS
{
  return 1;
}
## isHDFS()


## @function makeDirectory()
#
sub makeDirectory
{
  my ($raw_path) = @_;
  my $result = 0;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object
  my $path = &_generateHDFSPath($raw_path);
  if (!&fileTest($path, '-d'))
  {
    # - create the directory
    $thrift_client->mkdirs($path);
  }
  # - check that it exists
  return (&fileTest($path, '-d'));
}
## makeDirectory()


## @function modificationTime()
#
sub modificationTime
{
  my ($path) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object
  $path = &_generateHDFSPath($path);
  my $file_stat = $thrift_client->stat($path);
  return floor($file_stat->{modificationTime} / 1000);
}
## modificationTime()


## @function openFileHandle()
#
sub openFileHandle
{
  my ($raw_path, $mode, $fh_ref) = @_;
  &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
  # ensure we have a connection to the thrift server
  &_establishClient();
  #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
  my $path = &_generateHDFSPath($raw_path);
  my $fh = gensym();
  tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
  open($fh, $path, $mode) or die("Failed to open thriftfs");
  $$fh_ref = $fh;
  return 1;
}
## openFileHandle()


## @function readDirectory()
#
sub readDirectory
{
  my ($raw_path) = @_;
  my @files;
  # ensure we have a connection to the thrift server
  &_establishClient();
  my $path = &_generateHDFSPath($raw_path);
  my $raw_files = $thrift_client->listStatus($path);
  if ($raw_files && @{$raw_files} > 0)
  {
    foreach my $file_stat (@{$raw_files})
    {
      my $file_path = $file_stat->{'path'};
      my ($filename) = $file_path =~ /([^\\\/]+)$/;
      push(@files, $filename);
    }
  }
  return \@files;
}
## readDirectory()


## @function removeFiles()
#
sub removeFiles
{
  my ($path, $recursive) = @_;
  my $result = 0;
  if (!defined $recursive)
  {
    $recursive = 0;
  }
  # ensure we have a connection to the thrift server
  &_establishClient();
  # - convert the path into a proper thrift path object as necessary
  $path = &_generateHDFSPath($path);
  if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
  {
    $thrift_client->rm($path, $recursive);
    $result = !$thrift_client->exists($path);
  }
  return $result;
}
## removeFiles()


## @function removeFilesFiltered()
#
sub removeFilesFiltered
{
  my ($paths, $accept_re, $reject_re) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # Perform a depth first, recursive, removal of files and directories that
  # match the given accept and reject patterns
  my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
  my $num_removed = 0;
  foreach my $raw_path (@paths_array)
  {
    # remove trailing slashes
    $raw_path =~ s/[\/\\]+$//;
    my $path = &_generateHDFSPath($raw_path);
    if (!$thrift_client->exists($path))
    {
      print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
    }
    elsif (&fileTest($path, '-d'))
    {
      my @files = @{&readDirectory($path)};
      foreach my $file (@files)
      {
        my $child_path = $raw_path . '/' . $file;
        $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
      }
      if (!defined $accept_re && !defined $reject_re)
      {
        # remove this directory - non-recursively so that the command fails
        # if there are (somehow) still files contained within
        $thrift_client->rm($path, 0);
        if ($thrift_client->exists($path))
        {
          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
        }
        else
        {
          $num_removed++;
        }
      }
    }
    else
    {
      if (defined $reject_re && ($raw_path =~ m/$reject_re/))
      {
        next;
      }
      if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
      {
        # remove this file
        $thrift_client->rm($path, 0);
        if ($thrift_client->exists($path))
        {
          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
        }
        else
        {
          $num_removed++;
        }
      }
    }
  }
  return $num_removed;
}
## removeFilesFiltered()


## @function removeFilesRecursive()
#
sub removeFilesRecursive
{
  my ($path) = @_;
  # use the more general removeFilesFiltered() function with no accept or reject
  # expressions
  return &removeFilesFiltered($path, undef, undef);
}
## removeFilesRecursive()


## @function supportsSymbolicLink
#
sub supportsSymbolicLink
{
  return 0;
}
## supportsSymbolicLink()


## @function transferFile()
#
sub transferFile
{
  my ($mode, $src, $dst) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  #rint STDERR "transferFile($mode, $src, $dst)\n";
  my $src_path = &_generateHDFSPath($src);
  my $dst_path = &_generateHDFSPath($dst);
  if (&fileTest($dst_path, '-d'))
  {
    my ($filename) = $src =~ /([^\\\/]+)$/;
    $dst .= '/' . $filename;
    $dst_path = &_generateHDFSPath($dst);
  }
  if (!$thrift_client->exists($src_path))
  {
    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
    return 0;
  }
  if ($thrift_client->exists($dst_path))
  {
    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
    return 0;
  }
  # what happens next depends on the mode, and is either very easy or really
  # hard
  if ($mode eq 'MOVE')
  {
    $thrift_client->rename($src_path, $dst_path);
  }
  elsif ($mode eq 'COPY')
  {
    # Open the src file for reading
    #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
    my $fhin = $thrift_client->open($src_path);
    # Create the dst file for writing
    #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
    my $fhout = $thrift_client->create($dst_path);
    # Read all of src file writing to dst file
    # - this is where things have the potential to go wrong, as it doesn't seem
    #   thrift supports writing bytes
    # - only strings. May need to see if I can make Perl behave using black
    #   magic flags (marking string as binary etc) It'll work fine for text
    #   files though
    my $data = undef;
    my $offset = 0;
    # Read 4K blocks at a time
    while ($data = $thrift_client->read($fhin, $offset, $buffer_length))
    {
      $thrift_client->write($fhout, $data);
      $offset += $buffer_length;
      if (length ($data) < $buffer_length)
      {
        last;
      }
    }
    # Close files
    $thrift_client->close($fhout);
    $thrift_client->close($fhin);
  }
  my $result = ($thrift_client->exists($dst_path));
  #rint STDERR "transferFile() => $result\n";
  return $result;
}
## transferFile()


## @function transferFileFromLocal()
#
sub transferFileFromLocal
{
  my ($mode, $src, $dst) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # destination is remote
  my $dst_path = &_generateHDFSPath($dst);
  if (&fileTest($dst_path, '-d'))
  {
    my ($filename) = $src =~ /([^\\\/]+)$/;
    $dst .= '/' . $filename;
    $dst_path = &_generateHDFSPath($dst);
  }
  # can't replace - if the file already exists
  if (&fileTest($dst_path, '-f'))
  {
    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
    return 0;
  }
  # copy the file
  my $fhin;
  open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
  my $decoded = '';
  my $fhout = $thrift_client->create($dst_path);
  while (read($fhin, $decoded, $buffer_length))
  {
    if ($debug_encoding)
    {
      print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
    }
    # Base64 encode to protect binary
    #my $encoded = encode_base64($decoded);
    # Base91 encode to protect binary - we add a Byte Order Marker so the
    # Thrift Server can detect the need to decode the string sent
    my $encoded = MIME::Base91::encode($decoded);
    if ($debug_encoding)
    {
      print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
    }
    $thrift_client->write($fhout, $encoded);
  }
  close($fhin);
  $thrift_client->close($fhout);
  # in general, the transfer has worked if the destination file exists
  my $result = $thrift_client->exists($dst_path);
  # if moving, remove the source file from the local filesystem
  if ($mode eq 'MOVE')
  {
    unlink($src);
    # update result to reflect if we successfully removed the src file
    $result = $result && (!-f $src);
  }
  return $result
}
## transferFileFromLocal()


## @function transferFileToLocal()
#
sub transferFileToLocal
{
  my ($mode, $src, $dst) = @_;
  # ensure we have a connection to the thrift server
  &_establishClient();
  # source is remote
  my $src_path = &_generateHDFSPath($src);
  if (!$thrift_client->exists($src_path))
  {
    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
    return 0;
  }
  if (-d $dst)
  {
    my ($filename) = $src =~ /([^\\\/]+)$/;
    $dst .= '/' . $filename;
  }
  if (-e $dst)
  {
    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
    return 0;
  }
  # open local file
  my $fhout;
  my $encoded = undef;
  my $offset = 0;
  open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
  my $fhin = $thrift_client->open($src_path);
  # Read buffer_length *decoded* bytes - which means there may be a larger
  # number of *encoded* bytes returned
  while ($encoded = $thrift_client->read($fhin, $offset, $buffer_length))
  {
    if ($debug_encoding)
    {
      print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
    }
    my $decoded = MIME::Base91::decode($encoded);
    if ($debug_encoding)
    {
      print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
    }
    print $fhout $decoded;
    if (length ($decoded) < $buffer_length)
    {
      last;
    }
    else
    {
      $offset += $buffer_length;
    }
  }
  close($fhout);
  $thrift_client->close($fhin);
  # in general, the transfer has worked if the destination file exists
  my $result = (-f $dst);
  # if moving, remove the source file from the HDFS filesystem
  if ($mode eq 'MOVE')
  {
    $thrift_client->rm($src_path, 0);
    # update result to reflect if we successfully removed the src file
    $result = $result && !$thrift_client->exists($src_path);
  }
  return $result;
}
## transferFileToLocal()


1;
