SparkSQL 疫情Demo练习

时间:2020-02-01 19:12:19   收藏:0   阅读:96

在家闲着没事干, 写个简单的疫情数据处理Demo, 顺便回顾下SparkSQL。

模拟数据(以下数据皆为虚构, 如有雷同不胜荣幸)

需要导入的Maven坐标

<!-- 版本封装在properties属性中以解耦,我用的2.4.4 -->

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${sparkVersion}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${sparkVersion}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${sparkVersion}</version>
      <scope>provided</scope>
    </dependency>

先测试一下读取csv文件

package com.ronnie

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSVTest {
  def main(args: Array[String]): Unit = {
    val reader: SparkSession = SparkSession.builder()
      .appName("CSV Reader")
      .master("local")
      .getOrCreate()

    val civic_info: DataFrame = reader.read.format("csv")
      .option("delimiter",",") // 分隔符,看你具体是啥, 有的可能是|
      .option("header", "true") // 是否有头部,会自动帮你处理
      .option("nullValue", "\\N") // 空值替换成什么
      .option("inferSchema","true") // 启用推断模式
      .load("src/main/resources/civic_info.csv") // 其实应该存到hdfs或S3上, 从hdfs或S3上拿会比较好

    civic_info.show()
    civic_info.printSchema()


    val ticket_info: DataFrame = reader.read.format("csv")
      .option("delimiter",",")
      .option("header", "true")
      .option("nullValue", "\\N")
      .option("inferSchema","true")
      .load("src/main/resources/ticket_info.csv")

    ticket_info.show()
    civic_info.printSchema()
  }
}

然后直接干业务

package com.ronnie

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadCSVAsSQLTest {
  def main(args: Array[String]): Unit = {
    val reader: SparkSession = SparkSession.builder()
      .appName("CSV Reader")
      .master("local")
      .getOrCreate()

    val civic_info: DataFrame = reader.read.format("csv")
      .option("delimiter",",")
      .option("header", "true")
      .option("nullValue", "\\N")
      .option("inferSchema","true")
      .load("src/main/resources/civic_info.csv")

    civic_info.createTempView("civic")

    val ticket_info: DataFrame = reader.read.format("csv")
      .option("delimiter",",")
      .option("header", "true")
      .option("nullValue", "\\N")
      .option("inferSchema","true")
      .load("src/main/resources/ticket_info.csv")

    ticket_info.createTempView("ticket")

    println("湖北籍人员信息如下: ")
    reader.sql("select id_no, name from civic where province = '湖北'").show()

    println("来自武汉疫区人员如下: ")
    reader.sql("select id_no, name from civic where city = '武汉'").show()

    println("需要对员工进行隔离观察14天的公司: ")
    reader.sql("select distinct working_company from civic where province = '湖北'").show()

    println("有感染风险的车厢为: ")
    reader.sql("select distinct carriage_no from ticket where departure = '武汉'").show()

    println("需要执行隔离的人员: ")
    reader.sql("select passenger_name, passenger_id from ticket where carriage_no in (select distinct carriage_no from ticket where departure = '武汉')").show()

    // ps: 真正操作大数据时不可能全打印出来, 可以count一下查看到的条数来做判断。
  }
}

原文:https://www.cnblogs.com/ronnieyuan/p/12249118.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!