/** jmt12 **/
package org.nzdl.gsdl;

import org.nzdl.gsdl.GSGroupingComparator;
import org.nzdl.gsdl.GSInfoDB;
import org.nzdl.gsdl.GSPartitioner;

import java.io.*;
import java.lang.Iterable;
import java.lang.ProcessBuilder;
import java.lang.ProcessBuilder.*;
import java.lang.Thread;
import java.net.InetAddress;
import java.nio.channels.FileChannel;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;

/** @class WordCount
 */
public class HadoopGreenstoneIngest2
{

  /** @class GSFileRecordReader
   */
  public static class GSFileRecordReader
    extends RecordReader<Text, Text>
  {
    /** Uncompressed file name */
    private Text current_key;

    private Text current_value = new Text("");

    /** Used to indicate progress */
    private boolean is_finished = false;

    /**
     */
    @Override
    public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
      throws IOException, InterruptedException
    {
      FileSplit split = (FileSplit) inputSplit;
      current_key = new Text(split.getPath().toString());
    }
    /** initialize() **/

    /**
     *  We only ever have a single key/value
     */
    @Override
    public boolean nextKeyValue()
      throws IOException, InterruptedException
    {
      if (!is_finished)
      {
        is_finished = true;
        return true;
      }
      return false;
    }
    /** nextKeyValue() **/

    /** @function getProgress
     *  Rather than calculating progress, we just keep it simple
     */
    @Override
    public float getProgress()
      throws IOException, InterruptedException
    {
      return is_finished ? 1 : 0;
    }
    /** getProgress() **/

    /**
     *  Returns the current key (name of the zipped file)
     */
    @Override
    public Text getCurrentKey()
      throws IOException, InterruptedException
    {
        return current_key;
    }
    /** getCurrentKey() **/

    /**
     * Returns the current value (contents of the zipped file)
     */
    @Override
    public Text getCurrentValue()
      throws IOException, InterruptedException
    {
      return current_value;
    }
    /** getCurrentValue() **/

    /**
     * Close quietly, ignoring any exceptions
     */
    @Override
    public void close()
      throws IOException
    {
      // nothing to do
    }
    /** close() **/

  }
  /** GSFileRecordReader **/

  /** @class GSFileInputFormat
   */
  public static class GSFileInputFormat
    extends FileInputFormat<Text, Text>
  {

    private String[] getActiveServersList(JobContext context)
    {
      String [] servers = null;
      try
      {
        JobClient jc = new JobClient((JobConf)context.getConfiguration());
        ClusterStatus status = jc.getClusterStatus(true);
        Collection<String> atc = status.getActiveTrackerNames();
        servers = new String[atc.size()];
        int s = 0;
        for (String serverInfo : atc)
        {
          StringTokenizer st = new StringTokenizer(serverInfo, ":");
          String trackerName = st.nextToken();
          StringTokenizer st1 = new StringTokenizer(trackerName, "_");
          st1.nextToken();
          servers[s++] = st1.nextToken();
        }
      }
      catch (IOException e)
      {
        e.printStackTrace();
      }
      System.err.print("Servers: ");
      String sep = "";
      for (Object obj : servers)
      {
        System.err.print(sep + obj.toString());
        sep = ", ";
      }
      System.err.println("");
      return servers;
    }
    /** getActiveServersList() **/

    /**
     */
    public List<InputSplit> getSplits(JobContext job)
      throws IOException
    {
      System.err.println("GSFileInputFormat::getSplits()");
      // get splits
      List<InputSplit> original_splits = super.getSplits(job);
      // Get active servers
      String[] servers = getActiveServersList(job);
      if(servers == null)
      {
        return null;
      }
      // done
      System.err.println("Splits: ");
      for (InputSplit obj : original_splits)
      {
        System.err.println(obj.toString());
      }
      return original_splits;
    }
    /** getSplits() **/

    /**
     *  Don't split the files
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename)
    {
      return false;
    }
    /** isSplitable() **/

    /**
     */
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext content)
      throws IOException, InterruptedException
    {
      return new GSFileRecordReader();
    }
    /** createRecordReader() **/

  }
  /** class GSFileInputFormat **/

 /** @class GSMap
   */
  public static class GSMap
    extends Mapper<Text, Text, Text, Text>
  {
    /** @function map
     *  The key is the full path (HDFS) of the file to be processed.
     */
    public void map(Text key, Text value, Context context)
      throws IOException, InterruptedException
    {
      String file_path = key.toString();
      // - configuration for the task
      Configuration conf = context.getConfiguration();
      String gsdlhome = conf.get("gsdlhome");
      String hdfs_prefix = conf.get("hdfsprefix");
      String hadoop_home = conf.get("hadoophome");
      String collection = conf.get("collection");
      String task_id = conf.get("mapred.task.id");
      task_id = task_id.substring(8); // remove "attempt_" prefix

      // Programatically rewrite the protocol as appropriate for the given
      // archives directory (not necessary if path is local or NFS)
      if (hdfs_prefix.equals("/hdfs"))
      {
        file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
      }
      else
      {
        file_path = file_path.replace("hdfs://", hdfs_prefix);
      }

      // - create a temporary directory
      File greenstone_tmp_dir = new File("/tmp/greenstone");
      if (!greenstone_tmp_dir.isDirectory())
      {
        greenstone_tmp_dir.mkdir();
      }

      // - open a unique log file
      File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
      FileWriter fw1 = new FileWriter(import_process_log, true);
      // MEDUSA Customization: Introduce a slight delay based upon the hostname
      // in order to stagger the startup of Map workers. It looks like the avg
      // IO is around 25 minutes... so lets try to make it so the last mapper
      // starts up 25 minutes after the first (with all others spread in
      // between).
      String hostname = InetAddress.getLocalHost().getHostName();
      // We only do this if there is a sentinel file lurking in tmp
      try
      {
        File delay_file = new File("/tmp/greenstone/delay.me");
        if (delay_file.exists())
        {
          Pattern p = Pattern.compile("compute-0-([0-9]+).local");
          Matcher m = p.matcher(hostname);
          if (m.matches())
          {
            String node_str = m.group(1);
            int node_number = Integer.parseInt(node_str) * 100;
            fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
            Thread.currentThread().sleep(1000 * node_number);
          }
          // We only do this once for each compute node
          delay_file.delete();
        }
      }
      catch (Exception ie)
      {
        System.err.println(ie.toString());
      }

      // - start the log by writing the time and the manifest line
      double start_time = ((double)System.currentTimeMillis())/1000;
      StringBuffer header_block = new StringBuffer("[Started:");
      header_block.append(String.format("%.6f", start_time));
      header_block.append("]\n[Host:");
      header_block.append(hostname);
      header_block.append("]\n[CPU:");
      String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
      File getcpu_executable = new File(getcpu_executable_cmd);
      String cpu_number = "0";
      if (getcpu_executable.exists())
      {
        cpu_number = runCommand(getcpu_executable_cmd);
      }
      header_block.append(cpu_number);
      header_block.append("]\n[Task:");
      header_block.append(task_id);
      header_block.append("]\n[Map:");
      header_block.append(file_path);
      header_block.append("=>");
      header_block.append(value);
      header_block.append("]\n");
      fw1.write(header_block.toString());
      header_block = null;

      // - create a temporary manifest file to process this file. Overwrite any
      //   existing file
      File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
      FileWriter manifest_writer = new FileWriter(manifest_path);
      manifest_writer.write("<Manifest version=\"2.0\">\n");
      manifest_writer.write("\t<Index>\n");
      manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
      manifest_writer.write("\t</Index>\n");
      manifest_writer.write("</Manifest>\n");
      manifest_writer.close();

      /* Original process calling - sets up environment in Java
      // - call Greenstone passing in the path to the manifest
      ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
      fw1.write("[Command:" + import_process_builder.command() + "]\n");
      // - alter environment
      Map<String, String> import_process_env = import_process_builder.environment();
      //   - path
      String path = import_process_env.get("PATH");
      path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
      path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
      path = hadoop_home + "/bin:" + path;
      path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
      path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
      path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
      path = gsdlhome + "/bin/script:" + path;
      path = gsdlhome + "/bin/linux:" + path;
      import_process_env.put("PATH", path);
      fw1.write("[PATH: " + path + "]\n");
      //   - ld_library_path
      import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
      //   - dyld_library_path
      import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
      //   - misc
      import_process_env.put("GSDLHOME", gsdlhome);
      import_process_env.put("GSDLOS", "linux");
      import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
      //   - installed extension paths
      import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
      import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
      import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
      import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
      // - Hadoop specific
      import_process_env.put("HADOOP_PREFIX", hadoop_home);
      fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
      */

      /* New process call - adds call to setup.bash first to prepare
       * environment... hopefully */
      // - call Greenstone passing in the path to the manifest
      String environment_script_filename = "setup.bash";
      StringBuffer cmd_buffer = new StringBuffer();
      cmd_buffer.append("source ./");
      cmd_buffer.append(environment_script_filename);
      cmd_buffer.append(" && time -p import.pl -keepold -manifest \"");
      cmd_buffer.append(manifest_path.toString());
      cmd_buffer.append("\" -archivedir \"");
      cmd_buffer.append(conf.get("archivesdir"));
      cmd_buffer.append("\" ");
      cmd_buffer.append(collection);
      ProcessBuilder import_process_builder = new ProcessBuilder("bash", "-c", cmd_buffer.toString());
      fw1.write("[Command:" + import_process_builder.command() + "]\n");

      // - change working directory
      import_process_builder.directory(new File(gsdlhome));

      // - redirect STDERR to STDOUT for simplicity sake
      import_process_builder.redirectErrorStream(true);
      // Obsolete command to send output to file
      //import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));

      // - create progress reporter (so Hadoop doesn't time us out)
      Thread reporter = new HadoopProgressReporter(context, import_process_log);
      reporter.start();

      // - run process
      Process import_process = import_process_builder.start();
      BufferedReader import_process_br = new BufferedReader(new InputStreamReader(import_process.getInputStream()));
      String line = "";
      Pattern open_tag_pattern = Pattern.compile("<InfoDBEntry type=\"(.+?)\" key=\"(.+?)\" mode=\"(.+?)\" timestamp=\"(.+?)\">(.*?(</InfoDBEntry>)?)");
      Pattern close_tag_pattern = Pattern.compile("(.*?)</InfoDBEntry>");
      while ((line = import_process_br.readLine()) != null)
      {
        // Write line to process log regardless
        fw1.write(line + "\n");
        // Now we check for sentinel strings in the output line
        Text output_key;
        Text output_value;
        // Watch for open entry tags
        Matcher open_tag_matcher = open_tag_pattern.matcher(line);
        if(open_tag_matcher.matches())
        {
          String entry_type = open_tag_matcher.group(1);
          String entry_key  = open_tag_matcher.group(2);
          String entry_mode = open_tag_matcher.group(3);
          String entry_time = open_tag_matcher.group(4);
          String payload    = open_tag_matcher.group(5);
          StringBuffer line_buffer = new StringBuffer();
          // Continue until we've found the close tag - or run out of output log
          Matcher close_tag_matcher = close_tag_pattern.matcher(payload);
          while (!close_tag_matcher.matches() && (line = import_process_br.readLine()) != null)
          {
            // append any existing payload to the buffer
            if (line_buffer.length() > 0)
            {
              line_buffer.append("\n");
            }
            line_buffer.append(payload);
            // store this line as the payload, should the match below fail
            payload = line;
            close_tag_matcher = close_tag_pattern.matcher(line);
          }
          // We've found the close tag (hopefully) so add last bit (possibly
          // empty string) to value
          if (close_tag_matcher.matches())
          {
            String last_payload = close_tag_matcher.group(1);
            if (line_buffer.length() > 0)
            {
              line_buffer.append("\n");
            }
            line_buffer.append(last_payload);
            last_payload = null;
          }
          close_tag_matcher = null;

          // Construct the compound key by which to sort the entries - note
          // that src is a little different than the others, as we don't care
          // about the timestamp at all - instead wanting to group the entries
          // by src file path (key)
          if (entry_type.equals("src"))
          {
            output_key = new Text(entry_type + " " + entry_key);
          }
          else
          {
            output_key = new Text(entry_type + " " + entry_time);
          }
          // Doc has its payload prefixed by key
          if (entry_type.equals("doc"))
          {
            String encoded_xml = line_buffer.toString();
            String decoded_xml = encoded_xml.replace("&quot;","\"");
            decoded_xml = decoded_xml.replace("&apos;","'");
            decoded_xml = decoded_xml.replace("&lt;","<");
            decoded_xml = decoded_xml.replace("&gt;",">");
            decoded_xml = decoded_xml.replace("&amp;","&");
            output_value = new Text("[" + entry_key + "]\n" + decoded_xml);
          }
          else if (entry_type.equals("src"))
          {
            String encoded_xml = line_buffer.toString();
            String decoded_xml = encoded_xml.replace("&quot;","\"");
            decoded_xml = decoded_xml.replace("&apos;","'");
            decoded_xml = decoded_xml.replace("&lt;","<");
            decoded_xml = decoded_xml.replace("&gt;",">");
            decoded_xml = decoded_xml.replace("&amp;","&");
            output_value = new Text(entry_key + "|" + decoded_xml);
          }
          else if (entry_type.equals("rss"))
          {
            String encoded_xml = line_buffer.toString();
            String decoded_xml = encoded_xml.replace("&quot;","\"");
            decoded_xml = decoded_xml.replace("&apos;","'");
            decoded_xml = decoded_xml.replace("&lt;","<");
            decoded_xml = decoded_xml.replace("&gt;",">");
            decoded_xml = decoded_xml.replace("&amp;","&");
            output_value = new Text(decoded_xml);
          }
          else
          {
            output_value = new Text(line_buffer.toString());
          }
          entry_type = null;
          entry_key = null;
          entry_mode = null;
          entry_time = null;
          payload = null;
          line_buffer = null;
        }
        // all other lines we'll key by host and time
        else
        {
          double timestamp = ((double)System.currentTimeMillis())/1000;
          String argument1 = hostname + ":" + cpu_number + ":" + String.format("%.6f", timestamp);
          output_key = new Text("msg " + argument1);
          output_value = new Text(argument1 + " " + line);
        }

        // Store the result in the return context
        context.write(output_key, output_value);

        // May as well do this here - indicate to the Hadoop framework that
        // this process is still making progress (of some form)
        context.progress();

        // Cleanup
        open_tag_matcher = null;
        output_key = null;
        output_value = null;
      }
      open_tag_pattern = null;
      close_tag_pattern = null;
      try
      {
        int import_status = import_process.waitFor();
        if (import_status != 0)
        {
          throw new Exception("exit status: " + import_status);
        }
      }
      catch (Exception e)
      {
        System.err.println("Error! Import command failed (" + e.toString() + ")");
      }

      // - stop the progress reporter as, one way or another, there will be no
      //   more progress
      reporter.interrupt();
      reporter = null; // force gc

      // - write end time to log
      double end_time = ((double)System.currentTimeMillis())/1000;
      fw1.write("[Completed:" + String.format("%.6f", end_time) + "]\n");
      // - close our output to the log
      fw1.close();
    }
    /** map(LongWritable,Text,Context) **/

  }
  /** class GSMap **/


  /** @class GSReducer
   */
  public static class GSReducer
    extends Reducer<Text, Text, Text, Text>
  {


    /** Prepare the Reducer by looking up configuration for this collection to
     *  determine the appropriate tools to use to create databases.
     */
    public void setup (Context context)
    {
    }
    /** setup() **/


    /**
     */
    public void reduce(Text key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException
    {
      System.err.println("reduce(" + key.toString() + ", <values>, <context>)");
      Configuration conf = context.getConfiguration();
      String gsdl_home = conf.get("gsdlhome");
      String collection = conf.get("collection");
      String archives_dir = gsdl_home + "/collect/" + collection + "/archives";
      //String archives_dir = conf.get("archivesdir");
      // Eventually I'd like to read in database type from collect.cfg - or
      // maybe have it passed in as part of the context - but I'll hardcode
      // as GDBM for now as a proof of concept
      String infodbtype = "tdb";

      String key_string = key.toString();
      String[] key_parts = key_string.split(" ");
      if (key_parts.length >= 2)
      {
        String type = key_parts[0];
        String argument = key_parts[1];

        Iterator<Text> values_itr = values.iterator();
        // There are basically five different cases based on the key's type
        // - the first is datestamp... these are sorted earliest first, and
        //   since we only want the earliest we can ignore the rest!
        if (type.equals("datestamp"))
        {
          Text earliest_datestamp = values_itr.next();
          // Write this directly to file
          try
          {
            FileWriter earliest_datestamp_fout = new FileWriter(archives_dir + "/earliestDatestamp");
            earliest_datestamp_fout.write(earliest_datestamp.toString());
            earliest_datestamp_fout.close();
          }
          catch (Exception ex)
          {
            ex.printStackTrace();
          }
        }
        // For 'doc' types we open a pipe to the database creator and send all
        // the values through for processing
        else if (type.equals("doc"))
        {
          GSInfoDB archiveinf_doc = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-doc." + infodbtype);
          while (values_itr.hasNext())
          {
            Text db_entry = values_itr.next();
            archiveinf_doc.writeEntry(db_entry.toString());
          }
          archiveinf_doc.close();
        }
        // Similarly for 'src' types, except here we group all entries with the
        // same key
        else if (type.equals("src"))
        {
          // Sigh - you can only create this the TDB file on a local
          // filesystem, so I have to create it here, and then move it into
          // place when finished
          GSInfoDB archiveinf_src = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-src." + infodbtype);
          String current_file_path = "";
          StringBuffer current_record = new StringBuffer();
          Pattern file_path_pattern = Pattern.compile("(.*?)\\|(.*)");
          while (values_itr.hasNext())
          {
            Text db_entry_raw = values_itr.next();
            String db_entry = db_entry_raw.toString();
            // Parse out the file path this entry refers to
            Matcher file_path_matcher = file_path_pattern.matcher(db_entry);
            if (file_path_matcher.matches())
            {
              String this_file_path = file_path_matcher.group(1);
              String this_record = file_path_matcher.group(2);
              // Output the record (if there is one) if the file path changes
              if (!this_file_path.equals(current_file_path) && current_record.length() > 0)
              {
                archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
                // store the next records details
                current_file_path = this_file_path;
                current_record = new StringBuffer(this_record);
              }
              // Append onto our growing record
              else
              {
                current_file_path = this_file_path;
                if (current_record.length() > 0)
                {
                  current_record.append("\n");
                }
                current_record.append(this_record);
              }
            }
            else
            {
              // Not a valid src entry?
            }
          }
          if (!current_file_path.equals("") && current_record.length() > 0)
          {
            archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
          }
          archiveinf_src.close();
        }
        // For 'rss' we write all the entries - in order - to an XML file
        else if (type.equals("rss"))
        {
          try
          {
            FileWriter rss_item_rdf = new FileWriter(archives_dir + "/rss-items.rdf");
            while (values_itr.hasNext())
            {
              Text rss_entry = values_itr.next();
              rss_item_rdf.write(rss_entry.toString() + "\n");
            }
            rss_item_rdf.close();
          }
          catch (Exception ex)
          {
            ex.printStackTrace();
          }
        }
        // Everything else we assume are just process log messages - so get
        // Hadoop to write in order to log (I may need to annotate these
        // with the host so I can see which message came from which compute
        // node).
        else
        {
          Pattern file_path_pattern = Pattern.compile("([^ ]+) (.*)");
          while (values_itr.hasNext())
          {
            String compound_value = values_itr.next().toString();
            Pattern p = Pattern.compile("([^\\s]+) (.*)");
            Matcher m = p.matcher(compound_value);
            if (m.matches())
            {
              Text msg_key = new Text(m.group(1));
              Text msg_value = new Text(m.group(2));
              context.write(msg_key, msg_value);
            }
          }
        }
      }
      else
      {
        System.err.println("Error! Failed to parse key: " + key.toString());
      }
    }
    /** reduce(key, value, context) **/

  }
  /** class GSReducer **/

  /** @function main
   */
  public static void main(String[] args)
    throws Exception
  {
    if (args.length < 6)
    {
      System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
      System.exit(0);
    }

    Configuration conf = new Configuration();
    conf.set("gsdlhome",    args[0]);
    conf.set("hadoophome",  args[1]);
    conf.set("collection",  args[2]);
    conf.set("archivesdir", args[3]);
    conf.set("hdfsprefix",  args[4]); // "HDThriftFS", "HDFSShell", or ""
    conf.set("hdfsin",      args[5]);
    conf.set("hdfsout",     args[6]);

    // Set the number of retries to 1 - hopefully one of the following will work
    conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
    conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
    conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
    // prevent timeouts
    long milli_seconds = 4*60*60*1000; // 4 hour
    conf.setLong("mapred.task.timeout", milli_seconds);

    Job job = new Job(conf, "hadoopgreenstoneingest");
    job.setJarByClass(HadoopGreenstoneIngest2.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    // Register the map, combiner, and reducer classes
    job.setMapperClass(GSMap.class);
    job.setPartitionerClass(GSPartitioner.class);
    job.setGroupingComparatorClass(GSGroupingComparator.class);
    job.setReducerClass(GSReducer.class);

    // Sets the input and output handlers - may need to adjust input to provide
    // a series of filenames (TextInputFormat will instead read in a text file
    // and return each line...)
    job.setInputFormatClass(GSFileInputFormat.class);
    //job.setOutputFormatClass(NullOutputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    // Register the input and output paths
    // - this input path should be to a file (in HDFS) that lists the paths to
    //   the manifest files
    FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
    // - for now the output isn't that important, but in the future I may use
    //   this mechanism to produce a time based log.
    FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));

    // Recommended notation despite my hatiness of ?: syntax
    System.exit(job.waitForCompletion(true)?0:1);
  }
  /** main(String[]) **/

  /** @function copyFile(File, File)
   *
   *  @author Josh Froelich @ stackoverflow.com
   */
  public static void copyFile(File sourceFile, File destFile)
    throws IOException
  {
    if(!destFile.exists())
    {
      destFile.createNewFile();
    }
    FileChannel source = null;
    FileChannel destination = null;
    try
    {
      source = new FileInputStream(sourceFile).getChannel();
      destination = new FileOutputStream(destFile).getChannel();
      destination.transferFrom(source, 0, source.size());
    }
    finally
    {
      if(source != null)
      {
        source.close();
      }
      if(destination != null)
      {
        destination.close();
      }
    }
  }
  /** copyFile(File, File) **/


  /** @function runCommand()
   *
   * A convenience method that calls an external command and returns its
   * standard out as a string. Warning! Not safe if the command could return a
   * large amount of text in the STDERR stream - may infinitely block.
   *
   */
  public static String runCommand(String command)
  {
    ///ystem.err.println("[DEBUG] command: " + command);
    StringBuffer result = new StringBuffer();
    try
    {
      Runtime run = Runtime.getRuntime() ;
      Process pr = run.exec(command) ;
      pr.waitFor() ;
      BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
      String line;
      while ( ( line = buf.readLine() ) != null )
      {
        result.append(line);
      }
    }
    catch (Exception ex)
    {
      System.err.println("Error! " + ex.getMessage());
    }
    ///ystem.err.println("[DEBUG] result: " + result.toString());
    return result.toString();
  }
  /** runCommand() **/
}
