/** jmt12 **/
package org.nzdl.gsdl;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.ProcessBuilder;
import java.lang.ProcessBuilder.*;
import java.lang.Thread;
import java.net.InetAddress;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;

/** @class WordCount
 */
public class HadoopGreenstoneIngest
{

  /** @class GSFileRecordReader
   */
  public static class GSFileRecordReader
    extends RecordReader<Text, IntWritable>
  {
    /** Uncompressed file name */
    private Text current_key;

    private IntWritable current_value = new IntWritable(1);

    /** Used to indicate progress */
    private boolean is_finished = false;

    /**
     */
    @Override
    public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
      throws IOException, InterruptedException
    {
      FileSplit split = (FileSplit) inputSplit;
      current_key = new Text(split.getPath().toString());
    }
    /** initialize() **/

    /**
     *  We only ever have a single key/value
     */
    @Override
    public boolean nextKeyValue()
      throws IOException, InterruptedException
    {
      if (!is_finished)
      {
        is_finished = true;
        return true;
      }
      return false;
    }
    /** nextKeyValue() **/

    /** @function getProgress
     *  Rather than calculating progress, we just keep it simple
     */
    @Override
    public float getProgress()
      throws IOException, InterruptedException
    {
      return is_finished ? 1 : 0;
    }
    /** getProgress() **/

    /**
     *  Returns the current key (name of the zipped file)
     */
    @Override
    public Text getCurrentKey()
      throws IOException, InterruptedException
    {
        return current_key;
    }
    /** getCurrentKey() **/

    /**
     * Returns the current value (contents of the zipped file)
     */
    @Override
    public IntWritable getCurrentValue()
      throws IOException, InterruptedException
    {
      return current_value;
    }
    /** getCurrentValue() **/

    /**
     * Close quietly, ignoring any exceptions
     */
    @Override
    public void close()
      throws IOException
    {
      // nothing to do
    }
    /** close() **/

  }
  /** GSFileRecordReader **/

  /** @class GSFileInputFormat
   */
  public static class GSFileInputFormat
    extends FileInputFormat<Text, IntWritable>
  {
    /**
     *  Don't split the files
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename)
    {
      return false;
    }
    /** isSplitable() **/

    /**
     */
    @Override
    public RecordReader<Text, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext content)
      throws IOException, InterruptedException
    {
      return new GSFileRecordReader();
    }
    /** createRecordReader() **/

  }
  /** class GSFileInputFormat **/

 /** @class GSMap
   */
  public static class GSMap
    extends Mapper<Text, IntWritable, Text, IntWritable>
  {
    /** @function map
     *  The key is the full path (HDFS) of the file to be processed.
     */
    public void map(Text key, IntWritable value, Context context)
      throws IOException, InterruptedException
    {
      String file_path = key.toString();
      // - configuration for the task
      Configuration conf = context.getConfiguration();
      String gsdlhome = conf.get("gsdlhome");
      String hdfs_prefix = conf.get("hdfsprefix");
      String hadoop_home = conf.get("hadoophome");
      String collection = conf.get("collection");
      String task_id = conf.get("mapred.task.id");
      task_id = task_id.substring(8); // remove "attempt_" prefix

      // Programatically rewrite the protocol as appropriate for the given
      // archives directory (not necessary if path is local or NFS)
      if (hdfs_prefix.equals("/hdfs"))
      {
        file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
      }
      else
      {
        file_path = file_path.replace("hdfs://", hdfs_prefix);
      }

      // - create a temporary directory
      File greenstone_tmp_dir = new File("/tmp/greenstone");
      if (!greenstone_tmp_dir.isDirectory())
      {
        greenstone_tmp_dir.mkdir();
      }

      // - open a unique log file
      File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
      FileWriter fw1 = new FileWriter(import_process_log, true);
      // MEDUSA Customization: Introduce a slight delay based upon the hostname
      // in order to stagger the startup of Map workers. It looks like the avg
      // IO is around 25 minutes... so lets try to make it so the last mapper
      // starts up 25 minutes after the first (with all others spread in
      // between).
      String hostname = InetAddress.getLocalHost().getHostName();
      // We only do this if there is a sentinel file lurking in tmp
      try
      {
        File delay_file = new File("/tmp/greenstone/delay.me");
        if (delay_file.exists())
        {
          Pattern p = Pattern.compile("compute-0-([0-9]+).local");
          Matcher m = p.matcher(hostname);
          if (m.matches())
          {
            String node_str = m.group(1);
            int node_number = Integer.parseInt(node_str) * 100;
            fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
            Thread.currentThread().sleep(1000 * node_number);
          }
          // We only do this once for each compute node
          delay_file.delete();
        }
      }
      catch (Exception ie)
      {
        System.err.println(ie.toString());
      }

      // - start the log by writing the time and the manifest line
      long start_time = System.currentTimeMillis()/1000;
      StringBuffer header_block = new StringBuffer("[Started:");
      header_block.append(start_time);
      header_block.append("]\n[Host:");
      header_block.append(hostname);
      header_block.append("]\n[CPU:");
      String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
      File getcpu_executable = new File(getcpu_executable_cmd);
      if (getcpu_executable.exists())
      {
        header_block.append(runCommand(getcpu_executable_cmd));
      }
      else
      {
        header_block.append("0");
      }
      header_block.append("]\n[Task:");
      header_block.append(task_id);
      header_block.append("]\n[Map:");
      header_block.append(file_path);
      header_block.append("=>");
      header_block.append(value);
      header_block.append("]\n");
      fw1.write(header_block.toString());
      header_block = null;

      // - create a temporary manifest file to process this file. Overwrite any
      //   existing file
      File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
      FileWriter manifest_writer = new FileWriter(manifest_path);
      manifest_writer.write("<Manifest version=\"2.0\">\n");
      manifest_writer.write("\t<Index>\n");
      manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
      manifest_writer.write("\t</Index>\n");
      manifest_writer.write("</Manifest>\n");
      manifest_writer.close();

      // - call Greenstone passing in the path to the manifest
      //ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
      String environment_script_filename = "setup.bash";
      StringBuffer cmd_buffer = new StringBuffer();
      cmd_buffer.append("source ./");
      cmd_buffer.append(environment_script_filename);
      cmd_buffer.append(" && time -p import.pl -keepold -manifest \"");
      cmd_buffer.append(manifest_path.toString());
      cmd_buffer.append("\" -archivedir \"");
      cmd_buffer.append(conf.get("archivesdir"));
      cmd_buffer.append("\" ");
      cmd_buffer.append(collection);
      ProcessBuilder import_process_builder = new ProcessBuilder("bash", "-c", cmd_buffer.toString());
      fw1.write("[Command:" + import_process_builder.command() + "]\n");
      /*
      // - alter environment
      Map<String, String> import_process_env = import_process_builder.environment();
      // - build up the path
      String path = import_process_env.get("PATH");
      path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
      path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
      path = hadoop_home + "/bin:" + path;
      path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
      path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
      path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
      path = gsdlhome + "/bin/script:" + path;
      path = gsdlhome + "/bin/linux:" + path;
      import_process_env.put("PATH", path);
      fw1.write("[PATH: " + path + "]\n");
      //   - ld_library_path
      import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
      //   - dyld_library_path
      import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
      //   - misc
      import_process_env.put("GSDLHOME", gsdlhome);
      import_process_env.put("GSDLOS", "linux");
      import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
      //   - installed extension paths
      import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
      import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
      import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
      import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
      // - Hadoop specific
      import_process_env.put("HADOOP_PREFIX", hadoop_home);
      fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
      */

      // - change working directory
      import_process_builder.directory(new File(gsdlhome));
      // - close our output to the log before opening in the process
      fw1.close();

      // - write output to log
      import_process_builder.redirectErrorStream(true);
      import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));

      // - create progress reporter (so Hadoop doesn't time us out)
      Thread reporter = new HadoopProgressReporter(context, import_process_log);
      reporter.start();

      // - run process
      Process import_process = import_process_builder.start();
      try
      {
        int import_status = import_process.waitFor();
        if (import_status != 0)
        {
          throw new Exception("exit status: " + import_status);
        }
      }
      catch (Exception e)
      {
        System.err.println("Error! Import command failed (" + e.toString() + ")");
      }

      // - stop the progress reporter as, one way or another, there will be no
      //   more progress
      reporter.interrupt();
      reporter = null; // force gc

      // - write end time to log
      FileWriter fw2 = new FileWriter(import_process_log, true);
      long end_time = System.currentTimeMillis()/1000;
      fw2.write("[Completed:" + end_time + "]\n");
      fw2.close();

      // - for now return a dummy output. In the future I may want to parse the
      //   output from Greenstone as output and allow reducing to make me a
      //   pretty timebased log
      context.write(key, value);
    }
    /** map(LongWritable,Text,Context) **/

  }
  /** class GSMap **/

  /** @function main
   */
  public static void main(String[] args)
    throws Exception
  {
    if (args.length < 6)
    {
      System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
      System.exit(0);
    }

    Configuration conf = new Configuration();
    conf.set("gsdlhome",    args[0]);
    conf.set("hadoophome",  args[1]);
    conf.set("collection",  args[2]);
    conf.set("archivesdir", args[3]);
    conf.set("hdfsprefix",  args[4]); // "HDThriftFS", "HDFSShell", or ""
    conf.set("hdfsin",      args[5]);
    conf.set("hdfsout",     args[6]);

    // Set the number of retries to 1 - hopefully one of the following will work
    conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
    conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
    conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
    // prevent timeouts
    long milli_seconds = 4*60*60*1000; // 4 hour
    conf.setLong("mapred.task.timeout", milli_seconds);
    Job job = new Job(conf, "hadoopgreenstoneingest");
    job.setJarByClass(HadoopGreenstoneIngest.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // Register the map, combiner, and reducer classes
    job.setMapperClass(GSMap.class);
     // - in theory, uses the IdentityReducer by default, which simply returns
    //   the input as the output (so no processing)
    job.setNumReduceTasks(0);

    // Sets the input and output handlers - may need to adjust input to provide me
    // a series of filenames (TextInputFormat will instead read in a text file and
    // return each line...)
    job.setInputFormatClass(GSFileInputFormat.class);
    job.setOutputFormatClass(NullOutputFormat.class);
    //job.setOutputFormatClass(TextOutputFormat.class);

    // Register the input and output paths
    // - this input path should be to a file (in HDFS) that lists the paths to
    //   the manifest files
    FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
    // - for now the output isn't that important, but in the future I may use
    //   this mechanism to produce a time based log.
    FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));

    // Recommended notation despite my hatiness of ?: syntax
    System.exit(job.waitForCompletion(true)?0:1);
  }
  /** main(String[]) **/

  /** @function runCommand()
   *
   * A convenience method that calls an external command and returns its
   * standard out as a string. Warning! Not safe if the command could return a
   * large amount of text in the STDERR stream - may infinitely block.
   *
   */
  public static String runCommand(String command)
  {
    StringBuffer result = new StringBuffer();
    try
    {
      Runtime run = Runtime.getRuntime() ;
      Process pr = run.exec(command) ;
      pr.waitFor() ;
      BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
      String line;
      while ( ( line = buf.readLine() ) != null )
      {
        result.append(line);
      }
    }
    catch (Exception ex)
    {
      System.err.println("Error! " + ex.getMessage());
    }
    return result.toString();
  }
  /** runCommand() **/
}

class HadoopProgressReporter
extends Thread
{

  private Context hadoop_process;

  private File log_file;

  HadoopProgressReporter(Context hadoop_process, File log_file)
  {
    this.hadoop_process = hadoop_process;
    //this.log_file = log_file;
    this.log_file = new File("/tmp/hadoop_progress_reporter.log");
  }

  public void run()
  {
    try
    {
      while (!this.isInterrupted())
      {
        sleep(60000); // Wait a minute
        //FileWriter fw1 = new FileWriter(this.log_file, true);
        //long time = System.currentTimeMillis()/1000;
        //fw1.write("[" + time + "] HadoopProgressReporter.progress()\n");
        //fw1.close();
        this.hadoop_process.progress(); // Inform Hadoop we are still processing
      }
    }
    catch (InterruptedException iex)
    {
      // We've been interrupted: no more progress
    }
    catch (Exception ex)
    {
      ex.printStackTrace();
    }
  }
}
