博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
用spark中DataFrame对数据进行去重、缺失值处理、异常值处理
阅读量:3749 次
发布时间:2019-05-22

本文共 4867 字,大约阅读时间需要 16 分钟。

用spark中DataFrame对数据进行清洗

1. 准备工作

配置环境

import osfrom pyspark import SparkContext,SparkConffrom pyspark.sql import SparkSessionimport pyspark.sql.functions as fnJAVA_HOME = '/root/bigdata/jdk'PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"os.environ["JAVA_HOME"] = JAVA_HOMEos.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONSPARK_APP_NAME = "dataframetest"SPARK_URL = "spark://192.168.19.137:7077"conf = SparkConf()    # 创建spark config对象config = (	("spark.app.name", SPARK_APP_NAME),    # 设置启动的spark的app名称,没有提供,将随机产生一个名称	("spark.executor.memory", "6g"),    # 设置该app启动时占用的内存用量,默认1g	("spark.master", SPARK_URL),    # spark master的地址  ("spark.executor.cores", "4"),    # 设置spark executor使用的CPU核心数conf.setAll(config)# 利用config对象,创建spark sessionspark = SparkSession.builder.config(conf=conf).getOrCreate()

2. 数据去重

'''  1.删除重复数据  1.1删除完全一样的数据  1.2删除某些字段值完全相同的记录  1.3删除无意义字段的重复值'''df = spark.createDataFrame([  (1, 144.5, 5.9, 33, 'M'),  (2, 167.2, 5.4, 45, 'M'),  (3, 124.1, 5.2, 23, 'F'),  (4, 144.5, 5.9, 33, 'M'),  (5, 133.2, 5.7, 54, 'F'),  (3, 124.1, 5.2, 23, 'F'),  (5, 129.2, 5.3, 42, 'M'),], ['id', 'weight', 'height', 'age', 'gender'])	# 1.首先删除完全一样的记录 删除第3条和第6条中的一条  df2 = df.dropDuplicates().count()  结果:2  	# 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)	# 第1条和第四条	# 删除某些字段值完全一样的重复记录,subset参数定义这些字段  df3 = df2.dropDuplicates(subset=[c for c in df.columns if c!='id']).count()  结果:5  	#3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复)  df3.agg(fn.count('id').alias('id_dount'),         fn.countDistinct('id').alias('distinct_id_count')).collect()  结果:[ROW(id_count=5,distinct_id_count=4)]	# 4.对于id这种无意义的列重复,添加另外一列自增id  new_id的值并比连续 且比较大  df3.withColumn('new_id',fn.monotonically_increasing_id()).show()

3. 缺失值处理

'''  2.处理缺失值  2.1 对缺失值进行删除操作(行,列)  2.2 对缺失值进行填充操作(列的均值)  2.3 对缺失值对应的行或列进行标记'''  df_miss = spark.createDataFrame([    (1, 143.5, 5.6, 28,'M', 100000),    (2, 167.2, 5.4, 45,'M', None),    (3, None , 5.2, None, None, None),    (4, 144.5, 5.9, 33, 'M', None),    (5, 133.2, 5.7, 54, 'F', None),    (6, 124.1, 5.2, None, 'F', None),    (7, 129.2, 5.3, 42, 'M', 76000),    ],['id', 'weight', 'height', 'age', 'gender', 'income'])	# 1.计算每条记录的缺失值情况  df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect()  结果:[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]  	# 2.计算各列的缺失情况百分比  df_miss.agg(*[(1 - fn.count(c) / fn.count(*)).alias(c + '_missing')                 for c in df_miss.columns]).show()	# 3、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义 缺失三个就丢弃)  df_miss_no_income.dropna(thresh=3).show()	# 4、删除缺失值过于严重的列 其实是先建一个DF,不要缺失值的列   df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])  # 5、填充缺失值,可以用fillna来填充缺失值,  # 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing  # 对于数值类型,可以用均值或者中位数等填充  # fillna可以接收两种类型的参数:  # 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。  # 也可以接收一个字典{列名:值}这样  # 先计算均值,并组织成一个字典  means = df_miss_no_income.agg(*[fn.means(c).alias(c+'_mean') for c in          df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('record')[0]  means['gender'] = 'missing'  df_miss_no_income.fillna(means).show()

4. 异常值处理

'''  3、异常值处理  异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值  分位数去极值 ***  中位数绝对偏差去极值  正态分布去极值  上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值'''  df_outliers = spark.createDataFrame([    (1, 143.5, 5.3, 28),    (2, 154.2, 5.5, 45),    (3, 342.3, 5.1, 99),    (4, 144.5, 5.5, 33),    (5, 133.2, 5.4, 54),    (6, 124.1, 5.1, 21),    (7, 129.2, 5.3, 42),    ], ['id', 'weight', 'height', 'age'])  # 求出每个字段的边界 用4分位计算  cols = ['weight','height','age']  bounds = {
} for col in cols: quantiles = df_outliers.approxQuantile(col,[0.25,0.75],0.05) IQR = quantiles[1] - quantiles[0] bounds[col] = [ quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR ] 结果:{
'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]} #approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。 outliers = df_outliers.select(*['id'] + [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1])).alias(c + '_o')] for c in cols).show() # +---+--------+--------+-----+ # | id|weight_o|height_o|age_o| # +---+--------+--------+-----+ # | 1| false| false|false| # | 2| false| false|false| # | 3| true| false| true| # | 4| false| false|false| # | 5| false| false|false| # | 6| false| false|false| # | 7| false| false|false| # +---+--------+--------+-----+ # 与原始数据关联 df_outliers = df_outliers.join(outliers,on='id') # 取出有问题的值 df_outliers.filter('weight_o').select('id','weight').show() df_outliers.filter('age_o').select('id','age').show() # +---+------+ # | id|weight| # +---+------+ # | 3| 342.3| # +---+------+ # +---+---+ # | id|age| # +---+---+ # | 3| 99| # +---+---+

转载地址:http://jmdsn.baihongyu.com/

你可能感兴趣的文章
C语言数组旋转问题(C笔记)
查看>>
Keras软件安装
查看>>
cuda安装
查看>>
Zmapv6源码安装
查看>>
Anaconda3换源配置
查看>>
操作中划线-开头的文件
查看>>
Unsafe.putOrderedXXX系列方法详解(数组赋值的第二种方式)
查看>>
Netty对象池
查看>>
Netty写数据(动画)
查看>>
JVM(一)
查看>>
Java之枚举、注解、反射
查看>>
常见排序算法的优化
查看>>
java简单制作简单压缩文件gzip工具
查看>>
gzip代码
查看>>
个人理解的a++和++a的区别和联系
查看>>
知道两数之和,然后在数组中找到,输出数组下标
查看>>
c或者c++的随机数理解
查看>>
小甲鱼python视频xxoo爬虫代码改进--煎蛋网
查看>>
leetcode用户分组C
查看>>
leetcode历史时
查看>>