59859cc威尼斯官网-威尼斯欢乐娱人成app

教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

传智教育|传智播客

一样的教育,不一样的品质

全国校区

 

  • JavaEE
  • HTML&JS+前端
  • Python+大数据开发
  • 人工智能开发
  • UI/UE设计
  • App测试
  • 新媒体+短视频
    直播运营
  • 59859cc威尼斯官网-威尼斯欢乐娱人成app

编程的方法定义Schema信息【Python大数据技术文章】

更新时间:2021年09月08日18时13分 来源:传智教育 浏览次数:

好口碑IT培训

当case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:

(1)创建一个Row对象结构的RDD;

(2)基于StructType类型创建Schema;

(3)通过SparkSession提供的createDataFrame()方法来拼接Schema。

根据上述步骤,创建SparkSqlSchema. scala文件,使用编程方式定义Schema信息的具体代码如文件4-3所示。

文件4-3 SparkSqlSchema.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sq1.types.
{IntegerType,StringType,StructField,StructType}
import org.apache.spark.sql.(DataFrame,Row,Sparkession)
object SparkSqlSchema {
def main(args: Array[string]): Unit=(
//1.创建SparkSession
val spark: sparkSession=Sparksession.bullder()
.appName ("SparkSq1Schema")
.master ("1oca1[2]")
.getOrCreate ()
//2.获取sparkConttext对象
val sc: SparkContext=spark.sparkContext
//设置日志打印级别
sc.setLogLevel ( "WARN")
//3.加载数据
val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt")
//4.切分每一行
val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" "))
//5.加载数据到Row对象中
val personRDD:RDD[Row]=
dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//6.创建Schema
val schema:StructType=StructType(Seq(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age", IntegerType, false)
))
//7.利用personRDD与Schema创建DataFrame
val personDF:DataFrame=spark.createDataFrame(personRDD,schema)
//8.DSL操作显示DataFrame的数据结果
personDF . show ()
//9.将DataFrame注册成表
personDF.createOrReplaceTempView ("t_person")
//10.sq1语句操作
spark.sq1 ("select¥from t_ person") .show()
//11.关闭资源
sc.stop()
spark.stop ()

在文件4-3中,第9~23行代码表示将文件转换成为RDD的基本步骤,第25~29行代码即为编程方式定义Schema的核心代码,Spark SQL提供了Class StructType( val fields:Array[StructField])类来表示模式信息,生成一个StructType对象,需要提供fields作为输入参数,fields是个集合类型,StructField(name,dataTypenullable)参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置id、name、age字段作为Schema,第31行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第33~40行代码即为操作DataFrame进行数据查询。

猜你喜欢:

Kerberos是什么?Kerberos怎样做身份认证?

如何对序列实行切片操作?【Python切片教程】

怎样使用CLI调动Hive的一些功能?

MySQL表数据导入到Hive文件【图文详解】

传智教育python大数据开发培训

0 分享到:

59859cc威尼斯官网|威尼斯欢乐娱人成app

XML 地图 | Sitemap 地图