主要内容:

1. List转JavaRDD,打印JavaRDD

2. List转JavaRDD,JavaRDD转JavaPairRDD,打印JavaPairRDD

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

3. JavaRDD<String> 转 JavaRDD<Row>

 

1. 先将List转为JavaRDD,再通过collect()和foreach打印JavaRDD

 

/**
 * @author Yu Wanlong
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);

    // 使用collect打印JavaRDD
    for (String str : javaRDD.collect()) {
      System.out.println(str);
    }
    // 使用foreach打印JavaRDD
    javaRDD.foreach(new VoidFunction<String>() {
      @Override
      public void call(String s) throws Exception {
        System.out.println(s);
      }
    });
  }
}

a:1
a:2
b:1
b:1
c:1
d:1

 

 

 

2.  List转JavaRDD,JavaRDD转JavaPairRDD,打印JavaPairRDD

/**
 * @author Yu Wanlong
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);
    // JavaRDD<String> to JavaPairRDD
    JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) throws Exception {
            String[] ss = s.split(":");
            return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1]));
          }
        });
    // 使用collect对JavaPairRDD打印
    for (Tuple2<String, Integer> str : javaPairRDD.collect()) {
      System.out.println(str.toString());
    }
  }
}

(a,1)
(a,2)
(b,1)
(b,1)
(c,1)
(d,1)

 

 在JavaRDD<String>转为JavaPairRDD<String,Integer>的过程中,关键点为:

第一:mapToPair函数中的PairFunction<String, String, Integer>():PairFunction<JavaRDD输入的类型, 返回的JavaPairRDD的key类型, 返回的JavaPairRDD的value类型>()

第二:由于JavaPairRDD的存储形式本是key-value形式,Tuple2<String, Integer> 为需要返回的键值对类型,Tuple2<Key的类型, value类型>

第三:String s,String类型为JavaRDD<String>中的String,s代表其值

第四:return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1])),此处为返回的key-value的返回结果

 

小结:JavaRDD在转换成JavaPairRDD的时候,实际上是对单行的数据整合成key-value形式的过程,由JavaPairRDD在进行key-value运算时效率能大大提升

 

3.  JavaRDD<String> 转 JavaRDD<Row>

 

/**
 * @author Yu Wanlong
 */

import org.apache.spark.sql.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class ReadTextToRDD {

  public static void main(String[] args) {
    // configure spark
    SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD")
        .setMaster("local[2]").set("spark.executor.memory","2g");
    // start a spark context
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // build List<String>
    List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1");
    // List<String> to JavaRDD<String>
    JavaRDD<String> javaRDD = jsc.parallelize(list);

    // JavaRDD<String> to JavaRDD<Row>
    JavaRDD<Row> javaRDDRow = javaRDD.map(new Function<String, Row>() {
      @Override
      public Row call(String s) throws Exception {
        String[] ss = s.split(":");
        return RowFactory.create(ss[0], ss[1]);
      }
    });
    
    // 打印JavaRDD<Row>
    for (Row str : javaRDDRow.collect()) {
      System.out.println(str.toString());
    }
  }
}

[a,1]
[a,2]
[b,1]
[b,1]
[c,1]
[d,1]

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄