Spark JavaRDD、JavaPairRDD、Dataset相互转换与打印
主要内容:
1. List转JavaRDD,打印JavaRDD
2. List转JavaRDD,JavaRDD转JavaPairRDD,打印JavaPairRDD
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。3. JavaRDD<String> 转 JavaRDD<Row>
1. 先将List转为JavaRDD,再通过collect()和foreach打印JavaRDD
/** * @author Yu Wanlong */ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class ReadTextToRDD { public static void main(String[] args) { // configure spark SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD") .setMaster("local[2]").set("spark.executor.memory","2g"); // start a spark context JavaSparkContext jsc = new JavaSparkContext(sparkConf); // build List<String> List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1"); // List<String> to JavaRDD<String> JavaRDD<String> javaRDD = jsc.parallelize(list); // 使用collect打印JavaRDD for (String str : javaRDD.collect()) { System.out.println(str); } // 使用foreach打印JavaRDD javaRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); } } a:1 a:2 b:1 b:1 c:1 d:1
2. List转JavaRDD,JavaRDD转JavaPairRDD,打印JavaPairRDD
/** * @author Yu Wanlong */ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class ReadTextToRDD { public static void main(String[] args) { // configure spark SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD") .setMaster("local[2]").set("spark.executor.memory","2g"); // start a spark context JavaSparkContext jsc = new JavaSparkContext(sparkConf); // build List<String> List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1"); // List<String> to JavaRDD<String> JavaRDD<String> javaRDD = jsc.parallelize(list); // JavaRDD<String> to JavaPairRDD JavaPairRDD<String, Integer> javaPairRDD = javaRDD.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { String[] ss = s.split(":"); return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1])); } }); // 使用collect对JavaPairRDD打印 for (Tuple2<String, Integer> str : javaPairRDD.collect()) { System.out.println(str.toString()); } } } (a,1) (a,2) (b,1) (b,1) (c,1) (d,1)
在JavaRDD<String>转为JavaPairRDD<String,Integer>的过程中,关键点为:
第一:mapToPair函数中的PairFunction<String, String, Integer>():PairFunction<JavaRDD输入的类型, 返回的JavaPairRDD的key类型, 返回的JavaPairRDD的value类型>()
第二:由于JavaPairRDD的存储形式本是key-value形式,Tuple2<String, Integer> 为需要返回的键值对类型,Tuple2<Key的类型, value类型>
第三:String s,String类型为JavaRDD<String>中的String,s代表其值
第四:return new Tuple2<String, Integer>(ss[0], Integer.parseInt(ss[1])),此处为返回的key-value的返回结果
小结:JavaRDD在转换成JavaPairRDD的时候,实际上是对单行的数据整合成key-value形式的过程,由JavaPairRDD在进行key-value运算时效率能大大提升
3. JavaRDD<String> 转 JavaRDD<Row>
/** * @author Yu Wanlong */ import org.apache.spark.sql.Row; import org.apache.spark.SparkConf; import org.apache.spark.sql.RowFactory; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class ReadTextToRDD { public static void main(String[] args) { // configure spark SparkConf sparkConf = new SparkConf().setAppName("Read Text to RDD") .setMaster("local[2]").set("spark.executor.memory","2g"); // start a spark context JavaSparkContext jsc = new JavaSparkContext(sparkConf); // build List<String> List<String> list = Arrays.asList("a:1", "a:2", "b:1", "b:1", "c:1","d:1"); // List<String> to JavaRDD<String> JavaRDD<String> javaRDD = jsc.parallelize(list); // JavaRDD<String> to JavaRDD<Row> JavaRDD<Row> javaRDDRow = javaRDD.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { String[] ss = s.split(":"); return RowFactory.create(ss[0], ss[1]); } }); // 打印JavaRDD<Row> for (Row str : javaRDDRow.collect()) { System.out.println(str.toString()); } } } [a,1] [a,2] [b,1] [b,1] [c,1] [d,1]

更多精彩