spark-programming

/ Spark / 没有评论 / 129浏览

spark编程相关

Spark环境

测试环境基于scala 2.11、spark 2.1.0(HDP集群)

spark-core

Spark分区原则及方法

RDD分区的一个分区原则:尽可能使得分区的个数等于集群核心数目。

无论是本地模式、Standalone模式、YARN模式或Mesos模式,我们都可以通过spark.default.parallelism来配置其默认分区个数,若没有设置该值,则根据不同的集群环境确定该值:

spark-streaming

写数据库

模板代码:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    // return to the pool for future reuse
    ConnectionPool.returnConnection(connection)
  }
}

kafka管理offset

kafka自己维护offset:

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 对每个分区进行处理
    rdd.foreachPartition { records =>
      val conn = connPool.getConn
      records.foreach { record =>
        // do with conn
      }
      // 关闭
      connPool.returnPool(conn)
    }
    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

数据写入HBase

介绍三种写入的方式:

推荐采用saveAsNewAPIHadoopDataset或批量的方式写入HBase

测试场景:在一个字段,100w数据,spark streming3s一个批次,使用local[2]的情况下,批量或saveAsNewAPIHadoopDataset的方式均能在约6秒完成写入。

单条写入

单条写入:

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 对每个分区进行处理
    rdd.foreachPartition { records =>
      // 创建HBase连接
      val conf = createHBaseConf
      val conn = ConnectionFactory.createConnection(conf)
      // 消费,写入HBase
      records.foreach { record =>
        val table = conn.getTable(TableName.valueOf("test_bo"))
        // key作为row id
        val rowId = record.key()
        val put = new Put(Bytes.toBytes(rowId))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
        table.put(put)
        // 关闭
        table.close()
      }
      // 关闭
      conn.close()
    }
    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

批量写入

这里需要引入hbase-server,主要要使用org.apache.hadoop.hbase.mapreduce.TableOutputFormat

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>${hbase.version}</version>
</dependency>

先转化成put,然后批量写入:

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 转化成put,然后批量写入
    rdd.map { record =>
      val rowId = record.key()
      val put = new Put(Bytes.toBytes(rowId))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
      put
    }.foreachPartition { puts =>
      // 创建HBase连接
      val conn = ConnectionFactory.createConnection(conf)
      val table = conn.getTable(TableName.valueOf("test_bo"))
      // 批量写入
      import scala.collection.JavaConversions.seqAsJavaList
      table.put(seqAsJavaList(puts.toList))
      // 关闭
      table.close()
      conn.close()
    }
    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

saveAsNewAPIHadoopDataset写入

通过saveAsNewAPIHadoopDataset直接写入HBase:

def createHBaseConf: Configuration = {
  val conf = HBaseConfiguration.create()
  conf.set("hbase.zookeeper.quorum", "crpprdap25,crpprdap26,crpprdap27")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set("hbase.defaults.for.version.skip", "true")
  conf.set("zookeeper.znode.parent", "/hbase-unsecure")
  conf.set(TableOutputFormat.OUTPUT_TABLE, "test_bo")
  conf
}

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  val conf = createHBaseConf
  val job = Job.getInstance(conf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.map { record =>
      val rowId = record.key()
      val put = new Put(Bytes.toBytes(rowId))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
      (new ImmutableBytesWritable, put)
    }.saveAsNewAPIHadoopDataset(job.getConfiguration)

    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

spark-sql

连接hive

经过测试,HDP集群中,只需要添加hive.metastore.uris即可访问hive集群

val spark = SparkSession
  .builder()
  .appName("HiveApp")
  // 加这个配置访问集群中的hive
  .config("hive.metastore.uris", "thrift://crpprdap25:9083")
  .enableHiveSupport()
  .getOrCreate()

// 执行sql
spark.sql("show databases").show()

参考:

注意需要添加spark-hive依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

完整代码:

package com.example.spark.sql

import org.apache.spark.sql.{Row, SparkSession}


/**
  * hive表
  *
  * @author 奔波儿灞
  * @since 1.0
  */
object HiveApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("HiveApp")
      // 加这个配置访问集群中的hive
      // https://stackoverflow.com/questions/39201409/how-to-query-data-stored-in-hive-table-using-sparksession-of-spark2
      .config("hive.metastore.uris", "thrift://crpprdap25:9083")
      .enableHiveSupport()
      .getOrCreate()

    // 执行sql
    spark.sql("show databases").show()

    // 官方的例子
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

    // Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    // Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // The results of SQL queries are themselves DataFrames and support all normal functions.
    val sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    // The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
    import spark.implicits.newStringEncoder
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    // You can also use DataFrames to create temporary views within a SparkSession.
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    // Queries can then join DataFrame data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
  }

}

case class Record(key: Int, value: String)

执行:

cd /usr/hdp/2.6.0.3-8/spark2

sudo ./bin/spark-submit --class com.example.spark.sql.HiveApp ~/spark-programming-1.0-jar-with-dependencies.jar --master local[2]

连接jdbc

需要设置jdbc的相关信息,驱动、url、用户名、密码等。

val spark = SparkSession
  .builder()
  .appName("JdbcApp")
  .getOrCreate()

  // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
  val url = "jdbc:mysql://crpprdap25:15026/hdp?useUnicode=true&characterEncoding=UTF-8"
  // 分区的字段列
  val columnName = "id"
  // 分区的下界
  val lowerBound = 1
  // 分区的上界
  val upperBound = 10000
  // 分区数
  val numPartitions = 10
  val connectionProperties = new Properties()
  // 注意要设置对应数据库的驱动,maven需要添加依赖,或者将驱动导入每一个spark节点的jars中
  connectionProperties.put("driver", "com.mysql.jdbc.Driver")
  connectionProperties.put("user", "hdp")
  connectionProperties.put("password", "hdp")


  // Loading data from a JDBC source
  // 该操作将字段colName中1-10000条数据分到10个partition中,使用很方便,缺点也很明显,只能使用整形数据字段作为分区关键字。
  val jdbcDF = spark.read.jdbc(url, "sys_user", columnName, lowerBound, upperBound, numPartitions, connectionProperties)

  // Operators
  jdbcDF.show()

  // Saving data to a JDBC source
  jdbcDF.write.jdbc(url, "sys_user_bak", connectionProperties)

默认只有一个分区,我们需要使用多个分区,既能提高并发、又能防止数据量太大导致OOM。

默认的分区列只支持long型,如果是字符串需要自定义分区。

一般地,通过jdbc读取比较少,更多的是将数据计算完成后,通过jdbc写入。

Spark SQL and DataFrames - Spark 2.2.0 中文文档 - ApacheCN

代码