Java org.apache.spark.api.java.JavaPairRDD 代码实例

・26 分钟阅读

以下是展示如何使用org.apache.spark.api.java.JavaPairRDD的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1


public void run(@DataIn(name="example_events", type=ExampleEvent.class) View<ExampleEvent> input,
                @DataOut(name="odd_users", type=ExampleEvent.class) View<ExampleEvent> output) throws IOException {
  Job job = Job.getInstance(getJobContext().getHadoopConf());
  DatasetKeyInputFormat.configure(job).readFrom(input);
  DatasetKeyOutputFormat.configure(job).writeTo(output);
  JavaPairRDD<ExampleEvent, Void> inputData = getJobContext()
      .getSparkContext()
      .newAPIHadoopRDD(job.getConfiguration(), DatasetKeyInputFormat.class,
          ExampleEvent.class, Void.class);
  JavaPairRDD<ExampleEvent, Void> filteredData = inputData.filter(new KeepOddUsers());
  filteredData.saveAsNewAPIHadoopDataset(job.getConfiguration());
}
 

实例 2


public static void main(String[] args) throws Exception {
    // create a spark context object
    JavaSparkContext ctx = createJavaSparkContext();
    List<Tuple2<String,String>> listR = new ArrayList<Tuple2<String,String>>();
    listR.add(new Tuple2<String,String>("a1", "a2"));
    listR.add(new Tuple2<String,String>("b1", "b2"));
    listR.add(new Tuple2<String,String>("c1", "c2"));
    List<Tuple2<String,String>> listS = new ArrayList<Tuple2<String,String>>();
    listS.add(new Tuple2<String,String>("d1", "d2"));
    listS.add(new Tuple2<String,String>("e1", "e2"));
    listS.add(new Tuple2<String,String>("f1", "f2"));
    listS.add(new Tuple2<String,String>("g1", "g2"));
    // create two RDD's
    JavaPairRDD<String,String> R = ctx.parallelizePairs(listR);
    JavaPairRDD<String,String> S = ctx.parallelizePairs(listS);
    // <U> JavaPairRDD<T,U> cartesian(JavaRDDLike<U,?> other)
    // Return the Cartesian product of this RDD and another one,
    // that is, the RDD of all pairs of elements (a, b) 
    // where a is in this and b is in other.
    JavaPairRDD<Tuple2<String,String>, Tuple2<String,String>> cart = R.cartesian(S);
    // save the result
    cart.saveAsTextFile("/output/z");
    // done
    ctx.close();
    System.exit(0);
 }
 

实例 3


/**
 * 
 * @param in
 * @return
 */
public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKey( JavaPairRDD<MatrixIndexes, Double> in )
{
        //sum of blocks per key, w/o exploitation of corrections
        return in.reduceByKey(
                        new SumDoubleCellsFunction());
}
 

实例 4


private static JavaPairRDD<Integer,Iterable<Rating>> predictAll(
    MatrixFactorizationModel mfModel,
    JavaRDD<Rating> data,
    JavaPairRDD<Integer,Integer> userProducts) {
  @SuppressWarnings("unchecked")
  RDD<Tuple2<Object,Object>> userProductsRDD =
      (RDD<Tuple2<Object,Object>>) (RDD<?>) userProducts.rdd();
  return data.wrapRDD(mfModel.predict(userProductsRDD)).groupBy(new Function<Rating,Integer>() {
    @Override
    public Integer call(Rating r) {
      return r.user();
    }
  });
}
 

实例 5


@Override
public JavaRDD<Tuple2<String, Integer>> run(final JavaADAMContext ac, final JavaRDD<AlignmentRecord> recs, final String args) {
    JavaRDD<String> contigNames = recs.map(new Function<AlignmentRecord, String>() {
            @Override
            public String call(final AlignmentRecord rec) {
                return rec.getReadMapped() ? rec.getContig().getContigName() : "unmapped";
            }
        });
    JavaPairRDD<String, Integer> counts = contigNames.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(final String contigName) {
                return new Tuple2<String, Integer>(contigName, Integer.valueOf(1));
            }
        });
    JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(final Integer value0, final Integer value1) {
                return Integer.valueOf(value0.intValue() + value1.intValue());
            }
        });
    // todo:  seems like there should be a more direct way
    return JavaRDD.fromRDD(reducedCounts.rdd(), null);
}
 

实例 6


public int run(SparkConf conf, CommandLine cli) throws Exception {
    String zkHost = cli.getOptionValue("zkHost", "localhost:9983");
    String collection = cli.getOptionValue("collection", "collection1");
    String queryStr = cli.getOptionValue("query", "*:*");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    SolrRDD solrRDD = new SolrRDD(zkHost, collection);
    final SolrQuery solrQuery = SolrRDD.toQuery(queryStr);
    JavaRDD<SolrDocument> solrJavaRDD = solrRDD.query(jsc.sc(), solrQuery);
    JavaRDD<String> words = solrJavaRDD.flatMap(new FlatMapFunction<SolrDocument, String>() {
      public Iterable<String> call(SolrDocument doc) {
        Object tweet_s = doc.get("text_t");
        String str = tweet_s != null ? tweet_s.toString() : "";
        str = str.toLowerCase().replaceAll("[.,!?n]", " ").trim();
        return Arrays.asList(str.split(" "));
      }
    });
    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    });
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });
    for (Tuple2<?,?> tuple : counts.top(20, new WordCountSorter()))
      System.out.println(tuple._1() + ": " + tuple._2());
    // Now use schema information in Solr to build a queryable SchemaRDD
    SQLContext sqlContext = new SQLContext(jsc);
    // Pro Tip: SolrRDD will figure out the schema if you don't supply a list of field names in your query
    Map<String, String> options = new HashMap<String, String>();
    options.put("zkhost", zkHost);
    options.put("collection", collection);
    options.put("query", queryStr);
    DataFrame df = sqlContext.read().format("solr").options(options).load();
    long numEchos = df.filter(df.col("type_s").equalTo("echo")).count();
    System.out.println("numEchos >> "+numEchos);
    jsc.stop();
    return 0;
  }
 

实例 7


/**
 * 
 * @param in
 * @return
 */
public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
{
        //sum of blocks per key, w/o exploitation of correction blocks
        return in.reduceByKey(
                        new SumMultiBlockFunction());
}
 

实例 8


public int wordCount(DataStore<String,WebPage> inStore,
  DataStore<String, TokenDatum> outStore) throws IOException {
  //Spark engine initialization
  GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class,
     WebPage.class);
  SparkConf sparkConf = new SparkConf().setAppName(
    "Gora Spark Word Count Application").setMaster("local");
  Class[] c = new Class[1];
  c[0] = inStore.getPersistentClass();
  sparkConf.registerKryoClasses(c);
  //
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
  JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
  long count = goraRDD.count();
  log.info("Total Web page count: {}", count);
  JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
  JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
  //Print output for debug purpose
  log.info("SparkWordCount debug purpose TokenDatum print starts:");
  Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
  for (String key : tokenDatumMap.keySet()) {
    log.info(key);
    log.info(tokenDatumMap.get(key).toString());
  }
  log.info("SparkWordCount debug purpose TokenDatum print ends:");
  //
  //write output to datastore
  Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
  reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
  //
  return 1;
}
 

实例 9


public void run(@DataIn(name="source.users") View<GenericRecord> input,
                @DataOut(name="target.users") View<GenericRecord> output) throws IOException {
  Job job = Job.getInstance();
  DatasetKeyInputFormat.configure(job).readFrom(input);
  DatasetKeyOutputFormat.configure(job).writeTo(output);
  @SuppressWarnings("unchecked")
  JavaPairRDD<GenericData.Record, Void> inputData = getJobContext()
      .getSparkContext()
      .newAPIHadoopRDD(job.getConfiguration(), DatasetKeyInputFormat.class,
          GenericData.Record.class, Void.class);
  inputData.saveAsNewAPIHadoopDataset(job.getConfiguration());
}
 

实例 10


/**
 * Save the contents of the given RDD to the given view.
 *
 * @param rdd
 * @param uri
 */
public static void save(JavaPairRDD rdd, String uri,  Configuration conf) {
  // Copy configuration to avoid side effects for the caller.
  Configuration outputConf = new Configuration(conf);
  try {
    Job job = Job.getInstance(outputConf);
    DatasetKeyOutputFormat.configure(job).writeTo(uri);
    // Save non-empty RDDs.
    if (!rdd.isEmpty())
      rdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
  } catch (IOException e) {
    throw new AppException(e);
  }
}
 

实例 11


@Override
        public SparkClusterResult doCluster(DataSet ds) {
                // SparkDataSet needs to be passed in
                SparkDataSet rdd = (SparkDataSet)ds;
                // cache dataset in memory
//              rdd.getRDD().cache();
                distFunc = new DistanceFunction(this.typeDefs);
                ClusterFactory clusterFactory = new ClusterFactory(this.typeDefs, this.onlineUpdate);
                log.info("Starting threshold clusterer with threshold {}", threshold);
                // TODO look at using a reduce function 
                // Idea is the first step is a map<Instance, List<Instance>> that converts each instance to a single "cluster"
                // second step is a reduce where input is a List<Instances> and produces a List<Instances>
                // this step would merge clusters within threshold
                JavaPairRDD<String, Instance> instances = rdd.getRDD();
                instances.cache();
                // convert each instance into a singleton cluster
                JavaRDD<Map<String, Instance>> singletons = rdd.getRDD().map( new InstanceToClusterFunction(clusterFactory) );
                //singletons.cache();
                log.info("Generated initial singleton clusters");
                // merge clusters together
                Map<String, Instance> clusters = singletons.reduce( new AggregateClusterFunction(distFunc, threshold) );
                log.info("Merging clusters completed with {} clusters", clusters.size());
                // find the best cluster for each instance
                JavaPairRDD<String, Instance> bestCluster = instances.mapToPair( new BestClusterFunction(distFunc, clusters) );
                log.info("Output results");
                if (clusters != null && centroidsPath != null) rdd.getContext().parallelize(new ArrayList<Instance>(clusters.values())).saveAsTextFile(centroidsPath);
                if (bestCluster != null && clustersPath != null) bestCluster.saveAsTextFile(clustersPath);
                log.info("Threshold clusterer completed");
                // return the cluster membership rdd
                return new SparkClusterResult(bestCluster);
        }
 

实例 12


public static void main(String[] args) throws Exception {
   // STEP-1: handle input parameters
   if (args.length != 2) {
      System.err.println("Usage: Top10UsingTakeOrdered <input-path> <topN>");
      System.exit(1);
   }
   System.out.println("args[0]: <input-path>="+args[0]);
   System.out.println("args[1]: <topN>="+args[1]);
   final String inputPath = args[0];
   final int N = Integer.parseInt(args[1]);
   // STEP-2: create a Java Spark Context object
   JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
   // STEP-3: create an RDD from input
   //    input record format:
   //        <string-key><,><integer-value-count>
   JavaRDD<String> lines = ctx.textFile(inputPath, 1);
   lines.saveAsTextFile("/output/1");
   // STEP-4: partition RDD
   // public JavaRDD<T> coalesce(int numPartitions)
   // Return a new RDD that is reduced into numPartitions partitions.
   JavaRDD<String> rdd = lines.coalesce(9);
   // STEP-5: map input(T) into (K,V) pair
   // PairFunction<T, K, V>
   // T => Tuple2<K, V>
   JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
      public Tuple2<String,Integer> call(String s) {
         String[] tokens = s.split(","); // url,789
         return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
      }
   });
   kv.saveAsTextFile("/output/2");
   // STEP-6: reduce frequent K's
   JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer i1, Integer i2) {
         return i1 + i2;
      }
   });
   uniqueKeys.saveAsTextFile("/output/3");
   // STEP-7: find final top-N by calling takeOrdered()
   List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
   // STEP-8: emit final top-N
   for (Tuple2<String, Integer> entry : topNResult) {
      System.out.println(entry._2 + "--" + entry._1);
   }
   System.exit(0);
}
 

实例 13


public AnalysisResultFuture run() {
    final AnalysisJob analysisJob = _sparkJobContext.getAnalysisJob();
    final Datastore datastore = analysisJob.getDatastore();
    final JavaRDD<InputRow> inputRowsRDD = openSourceDatastore(datastore);
    final JavaPairRDD<String, NamedAnalyzerResult> namedAnalyzerResultsRDD;
    if (_sparkJobContext.getAnalysisJobBuilder().isDistributable()) {
        logger.info("Running the job in distributed mode");
        // TODO: We have yet to get more experience with this setting - do a
        // benchmark of what works best, true or false.
        final boolean preservePartitions = true;
        final JavaRDD<Tuple2<String, NamedAnalyzerResult>> processedTuplesRdd = inputRowsRDD
                .mapPartitionsWithIndex(new RowProcessingFunction(_sparkJobContext), preservePartitions);
        final JavaPairRDD<String, NamedAnalyzerResult> partialNamedAnalyzerResultsRDD = processedTuplesRdd
                .mapPartitionsToPair(new TuplesToTuplesFunction<String, NamedAnalyzerResult>(), preservePartitions);
        namedAnalyzerResultsRDD = partialNamedAnalyzerResultsRDD.reduceByKey(new AnalyzerResultReduceFunction(
                _sparkJobContext));
    } else {
        logger.warn("Running the job in non-distributed mode");
        JavaRDD<InputRow> coalescedInputRowsRDD = inputRowsRDD.coalesce(1);
        namedAnalyzerResultsRDD = coalescedInputRowsRDD.mapPartitionsToPair(new RowProcessingFunction(
                _sparkJobContext));
    }
    final JavaPairRDD<String, AnalyzerResult> finalAnalyzerResultsRDD = namedAnalyzerResultsRDD
            .mapValues(new ExtractAnalyzerResultFunction());
    // log analyzer results
    final List<Tuple2<String, AnalyzerResult>> results = finalAnalyzerResultsRDD.collect();
    logger.info("Finished! Number of AnalyzerResult objects: {}", results.size());
    for (Tuple2<String, AnalyzerResult> analyzerResultTuple : results) {
        final String key = analyzerResultTuple._1;
        final AnalyzerResult result = analyzerResultTuple._2;
        logger.info("AnalyzerResult: " + key + "->" + result);
    }
    // log accumulators
    final Map<String, Accumulator<Integer>> accumulators = _sparkJobContext.getAccumulators();
    for (Entry<String, Accumulator<Integer>> entry : accumulators.entrySet()) {
        final String name = entry.getKey();
        final Accumulator<Integer> accumulator = entry.getValue();
        logger.info("Accumulator: {} -> {}", name, accumulator.value());
    }
    return new SparkAnalysisResultFuture(results);
}
 

实例 14


public static void main(String[] args) throws Exception {
   printArguments(args);
   if (args.length != 2) {
      System.err.println("Usage: CharCount <input> <output>");
      System.exit(1);
   }
   // handle input parameters
   final String inputPath = args[0];
   final String outputPath = args[1];
   // create a context object, which is used 
   // as a factory for creating new RDDs
   JavaSparkContext context = new JavaSparkContext();
   // read input and create the first RDD
   JavaRDD<String> lines = context.textFile(inputPath);
   //                                                                              input   output:K   output:V
   JavaPairRDD<Character,Long> chars = lines.flatMapToPair(new PairFlatMapFunction<String, Character, Long>() {
      @Override
      public Iterable<Tuple2<Character,Long>> call(String s) {
         if ((s == null) || (s.length() == 0)) {
            return Collections.emptyList();
         }            
         String[] words = s.split(" ");
         List<Tuple2<Character,Long>> list = new ArrayList<Tuple2<Character,Long>>();
         for (String  word : words) {
            char[] arr = word.toLowerCase().toCharArray();
            for (char c : arr) {
                list.add(new Tuple2<Character, Long>(c, 1l));
            }
         }
         return list;
      }
   });
   // find the total count for each unique char
   JavaPairRDD<Character, Long> counts = 
        chars.reduceByKey(new Function2<Long, Long, Long>() {
      @Override
      public Long call(Long i1, Long i2) {
         return i1 + i2;
      }
   });
   // sort and save the final output 
   counts.sortByKey().saveAsTextFile(outputPath);
   // close the context and we are done
   context.close();
   System.exit(0);
}
 

实例 15


public static void main(String[] args) throws Exception {
   if (args.length != 2) {
      System.err.println("Usage: SparkCharCount <input> <output>");
      System.exit(1);
   }
   // handle input parameters
   final String inputPath = args[0];
   final String outputPath = args[1];
   // create a context object, which is used 
   // as a factory for creating new RDDs
   JavaSparkContext context = new JavaSparkContext();
   // read input and create the first RDD
   JavaRDD<String> lines = context.textFile(inputPath, 1);
   //                                                                              input   output:K   output:V
   JavaPairRDD<Character,Long> chars = lines.flatMapToPair(new PairFlatMapFunction<String, Character, Long>() {
      @Override
      public Iterable<Tuple2<Character,Long>> call(String s) {
         if ((s == null) || (s.length() == 0)) {
            return Collections.emptyList();
         }            
         Map<Character,Long> map = new HashMap<Character,Long>();
         String[] words = s.split(" ");
         for (String  word : words) {
            char[] arr = word.toLowerCase().toCharArray();
            for (char c : arr) {
                Long count = map.get(c);
                if (count == null) {
                    map.put(c, 1l);
                }
                else {
                    map.put(c, count+1);
                }
            }
         }
         return toListOfKeyValuePairs(map);
      }
   });
   // find the total count for each unique char
   JavaPairRDD<Character, Long> counts = 
        chars.reduceByKey(new Function2<Long, Long, Long>() {
      @Override
      public Long call(Long i1, Long i2) {
         return i1 + i2;
      }
   });
   // sort and save the final output 
   counts.sortByKey().saveAsTextFile(outputPath);
   // close the context and we are done
   context.close();
   System.exit(0);
}
 
讨论
淘淘あ西西 profile image