使用Geotellis的API上传本地Tiff文件到Hadoop

  涉及依赖:

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
        <dependency>
            <groupId>org.locationtech.geotrellis</groupId>
            <artifactId>geotrellis-spark_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.geotrellis</groupId>
            <artifactId>geotrellis-hbase_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

  核心代码(scala语言)

object UploadTiff {
  val inputPath = "H:\\BigData\\tiff\\GF1_PMS2_E120.6_N31.0_20151203_L1A0001216039_fusion.tif"
  val outputPath = "hdfs://localhost:9000/catalogLatLng"

  def main(args: Array[String]): Unit = {
    // Setup Spark to use Kryo serializer.
    val conf =
      new SparkConf()
        .setMaster("local[*]")
        .setAppName("Spark Tiler")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .set("spark.kryo.registrator", "geotrellis.spark.io.kryo.KryoRegistrator")

    val sc = new SparkContext(conf)
    try {
      run(sc)
      // Pause to wait to close the spark context,
      // so that you can check out the UI at http://localhost:4040
      println("Hit enter to exit.")
      StdIn.readLine()
    } finally {
      sc.stop()
    }
  }

  def fullPath(path: String) = new java.io.File(path).getAbsolutePath

  def run(implicit sc: SparkContext) = {
    // Read the geotiff in as a single image RDD,
    // using a method implicitly added to SparkContext by
    // an implicit class available via the
    // "import geotrellis.spark.io.hadoop._ " statement.
    val inputRdd: RDD[(ProjectedExtent, MultibandTile)] = {
      sc.hadoopMultibandGeoTiffRDD(inputPath)
    }

    // Use the "TileLayerMetadata.fromRdd" call to find the zoom
    // level that the closest match to the resolution of our source image,
    // and derive information such as the full bounding box and data type.
    val (_, rasterMetaData) =
    TileLayerMetadata.fromRDD(inputRdd, FloatingLayoutScheme(512))

    // Use the Tiler to cut our tiles into tiles that are index to a floating layout scheme.
    // We'll repartition it so that there are more partitions to work with, since spark
    // likes to work with more, smaller partitions (to a point) over few and large partitions.
    val tiled: RDD[(SpatialKey, MultibandTile)] =
    inputRdd
      .tileToLayout(rasterMetaData.cellType, rasterMetaData.layout, Bilinear)
      .repartition(100)

    // We'll be tiling the images using a zoomed layout scheme
    // in the web mercator format (which fits the slippy map tile specification).
    // We'll be creating 256 x 256 tiles.
    val layoutScheme = ZoomedLayoutScheme(LatLng, tileSize = 256)

    // We need to reproject the tiles to WebMercator
    val (zoom, reprojected): (Int, RDD[(SpatialKey, MultibandTile)] with Metadata[TileLayerMetadata[SpatialKey]]) =
      MultibandTileLayerRDD(tiled, rasterMetaData)
        .reproject(LatLng, layoutScheme, Bilinear)

    // Create the attributes store that will tell us information about our catalog.
    //    val attributeStore = HadoopAttributeStore(outputPath)
    var zookpeers: Seq[String] = List("localhost:2181")
    var master: String = "localhost"
   
    val hbaseInstance = HBaseInstance(zookpeers, master)
    val attributeStore = HBaseAttributeStore(hbaseInstance)

    // Create the writer that we will use to store the tiles in the local catalog.
    //    val writer = HadoopLayerWriter(outputPath, attributeStore)
    var catalog: String = "catalogLatLng"
    val writer = HBaseLayerWriter(attributeStore, catalog)

    // Pyramiding up the zoom levels, write our tiles out to the local file system.
    Pyramid.upLevels(reprojected, layoutScheme, zoom, Bilinear) { (rdd, z) =>
      val layerId = LayerId("landsat", z)
      // If the layer exists already, delete it out before writing
      if (attributeStore.layerExists(layerId)) {
        //        new HadoopLayerManager(attributeStore).delete(layerId)
        new HBaseLayerManager(attributeStore, hbaseInstance).delete(layerId)
      }
      writer.write(layerId, rdd, ZCurveKeyIndexMethod)
    }
  }
}

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄