#!/usr/bin/perl

use Cwd;
use Sort::Key::Natural qw(natsort);

use strict;
use warnings;

print "=================== Update Data Locality ===================\n";
print "Update the data locality CSV based upon information found in\n";
print "the hadoop.log regarding Split transforms (for instance when\n";
print "attempting to *prevent* data locality)\n";
print "============================================================\n\n";

our $debug = 0;

# Loop through all the arguments, descending into directories to process data
# locality files
my $offset = 0;
my $found_dirs = 0;
while (defined $ARGV[$offset])
{
  my $argument = $ARGV[$offset];
  # handle -option arguments?
  if ($argument =~ /^-(.+)$/)
  {
    my $option = $1;
    if ($option eq 'debug')
    {
      $debug = 1;
    }
  }
  # handle directories
  else
  {
    my $path = getcwd() . '/' . $argument;
    if (-d $path)
    {
      $found_dirs++;
      &searchForDataLocalityFiles($path);
    }
  }
  $offset++;
}
if ($found_dirs == 0)
{
  &printUsage('Missing results directory');
}

# Complete!
print "\n========================= Complete =========================\n\n";
exit;

################################################################################
#################################  Functions  ##################################
################################################################################


## @function searchForDataLocalityFiles()
#
sub searchForDataLocalityFiles
{
  my ($dir) = @_;
  # All of the following is *only* valid for directories
  if (-d $dir)
  {
    print ' * Searching for Data Locality information: ' . $dir . "\n";
    # Does this directory contain data_locality.csv?
    my $dl_csv_path = $dir . '/data_locality.csv';
    if (-f $dl_csv_path)
    {
      &updateDataLocalityForDirectory($dir);
    }
    # Also descend into child directories
    if (opendir(DIR, $dir))
    {
      my @files = readdir(DIR);
      closedir(DIR);
      foreach my $file (@files)
      {
        my $path = $dir . '/' . $file;
        if ($file =~ /^\./)
        {
        }
        elsif (-d $path)
        {
          &searchForDataLocalityFiles($path);
        }
      }
    }
    else
    {
      die('Error! Failed to open directory for reading: ' . $dir);
    }
  }
}
## searchForDataLocalityFiles() ##


sub updateDataLocalityForDirectory
{
  my ($dir_prefix) = @_;
  # 0. Init
  my $data = {};
  my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
  my $hadoop_log_path = $dir_prefix . '/hadoop.log';

  # 1. Read in current data locality comma separated file
  print "   * Read current file: data_locality.csv\n";
  if (open(DLIN, '<:utf8', $data_locality_csv_path))
  {
    my $record_count = 0;
    while (my $line = <DLIN>)
    {
      &printDebug($line);
      # If the line already indicates this csv has been updated (by the
      # presence of an "OriginalSplits" column) we don't need to do anything
      # else
      if ($line =~ /OriginalSplits/)
      {
        print "   ! File already updated...\n";
        close(DLIN);
        return;
      }
      if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
      {
        my $taskid       = $1;
        my $attempt_no   = $2;
        my $compute_node = $3;
        my $splits       = $4;

        # Use the taskid to look up the manifest file and therein the filepath
        my $filepath = '';
        my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
        &printDebug('searching for manifest file: ' . $manifest_path);
        if (open(MIN, '<:utf8', $manifest_path))
        {
          while (my $line2 = <MIN>)
          {
            if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
            {
              my $protocol = $1;
              $filepath = 'hdfs:' . $2;
            }
          }
          close(MIN);
        }
        else
        {
          die('Failed to open manifest for reading: ' . $manifest_path);
        }
        &printDebug('filepath: ' . $filepath);
        $data->{$filepath} = {'taskid' => $taskid,
                              'attempt' => $attempt_no,
                              'node' => $compute_node,
                              'splits' => $splits,
                              'original_splits' => ''};
        $record_count++;
      }
    }
    close(DLIN);
    print '   - read ' . $record_count . " records.\n";
  }
  else
  {
    die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
  }

  # 2. Read in hadoop log looking for split mapping information
  print "   * Read split information: hadoop.log\n";
  if (open(HLIN, '<:utf8', $hadoop_log_path))
  {
    my $record_count = 0;
    my $filepath = '';
    while (my $line = <HLIN>)
    {
      if ($line =~ /^Filepath:\s(.+)$/)
      {
        $filepath = $1;
      }
      elsif ($filepath ne '')
      {
        # this is the information that was unintentionally lost
        if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
        {
          my $original_splits = &parseSplits($1);
          if (!defined $data->{$filepath})
          {
            die('Found split information for file not in data locality log: ' . $filepath);
          }
          $data->{$filepath}->{'original_splits'} = $original_splits;
          $filepath = '';
          $record_count++;
        }
      }
    }
    close(HLIN);
    print '   - updated ' . $record_count . " records.\n";
  }
  else
  {
    die('Error! Failed to open file for reading: ' . $hadoop_log_path);
  }

  # 3. Write out the updated data locality file including information about
  #    original splits (should we have found that information) and corrected
  #    data locality flag.
  # backup old
  my $backup_path = $data_locality_csv_path . '.bak';
  if (-f $backup_path)
  {
    unlink($backup_path);
  }
  rename($data_locality_csv_path, $backup_path);
  # write new
  print "   * Writing out updated file: data_locality.csv\n";
  if (open(DLOUT, '>:utf8', $data_locality_csv_path))
  {
    my $record_count = 0;
    print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
    foreach my $filepath (keys %{$data})
    {
      my $record = $data->{$filepath};
      my $compute_node = $record->{'node'};
      my @dl_nodes = split(',', $record->{'original_splits'});
      my $data_local = 0;
      foreach my $dl_node (@dl_nodes)
      {
        if ($dl_node eq $compute_node)
        {
          $data_local = 1;
        }
      }
      print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
      $record_count++;
    }
    close(DLOUT);
    print '   - wrote ' . $record_count . " records\n";
  }
  else
  {
    die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
  }
}
## updateDataLocalityForDirectory() ##


## @function parseSplits()
#
sub parseSplits
{
  my ($raw_splits) = @_;
  my @splits = split(/\s*,\s*/, $raw_splits);
  return join(',', natsort(@splits));
}

sub printDebug
{
  my ($msg) = @_;
  if ($debug)
  {
    chomp($msg);
    print '[DEBUG] ' . $msg . "\n";
  }
}

sub printUsage
{
  my ($msg) = @_;
  if (defined $msg)
  {
    print 'Error! ' . $msg . "\n";
  }
  print 'Usage:  update_data_locality.pl <results directory>' . "\n\n";
  exit;
}
