本文共 4867 字,大约阅读时间需要 16 分钟。
配置环境
import osfrom pyspark import SparkContext,SparkConffrom pyspark.sql import SparkSessionimport pyspark.sql.functions as fnJAVA_HOME = '/root/bigdata/jdk'PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"os.environ["JAVA_HOME"] = JAVA_HOMEos.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONSPARK_APP_NAME = "dataframetest"SPARK_URL = "spark://192.168.19.137:7077"conf = SparkConf() # 创建spark config对象config = ( ("spark.app.name", SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", "6g"), # 设置该app启动时占用的内存用量,默认1g ("spark.master", SPARK_URL), # spark master的地址 ("spark.executor.cores", "4"), # 设置spark executor使用的CPU核心数conf.setAll(config)# 利用config对象,创建spark sessionspark = SparkSession.builder.config(conf=conf).getOrCreate()
''' 1.删除重复数据 1.1删除完全一样的数据 1.2删除某些字段值完全相同的记录 1.3删除无意义字段的重复值'''df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), (4, 144.5, 5.9, 33, 'M'), (5, 133.2, 5.7, 54, 'F'), (3, 124.1, 5.2, 23, 'F'), (5, 129.2, 5.3, 42, 'M'),], ['id', 'weight', 'height', 'age', 'gender']) # 1.首先删除完全一样的记录 删除第3条和第6条中的一条 df2 = df.dropDuplicates().count() 结果:2 # 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样) # 第1条和第四条 # 删除某些字段值完全一样的重复记录,subset参数定义这些字段 df3 = df2.dropDuplicates(subset=[c for c in df.columns if c!='id']).count() 结果:5 #3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复) df3.agg(fn.count('id').alias('id_dount'), fn.countDistinct('id').alias('distinct_id_count')).collect() 结果:[ROW(id_count=5,distinct_id_count=4)] # 4.对于id这种无意义的列重复,添加另外一列自增id new_id的值并比连续 且比较大 df3.withColumn('new_id',fn.monotonically_increasing_id()).show()
''' 2.处理缺失值 2.1 对缺失值进行删除操作(行,列) 2.2 对缺失值进行填充操作(列的均值) 2.3 对缺失值对应的行或列进行标记''' df_miss = spark.createDataFrame([ (1, 143.5, 5.6, 28,'M', 100000), (2, 167.2, 5.4, 45,'M', None), (3, None , 5.2, None, None, None), (4, 144.5, 5.9, 33, 'M', None), (5, 133.2, 5.7, 54, 'F', None), (6, 124.1, 5.2, None, 'F', None), (7, 129.2, 5.3, 42, 'M', 76000), ],['id', 'weight', 'height', 'age', 'gender', 'income']) # 1.计算每条记录的缺失值情况 df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect() 结果:[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)] # 2.计算各列的缺失情况百分比 df_miss.agg(*[(1 - fn.count(c) / fn.count(*)).alias(c + '_missing') for c in df_miss.columns]).show() # 3、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义 缺失三个就丢弃) df_miss_no_income.dropna(thresh=3).show() # 4、删除缺失值过于严重的列 其实是先建一个DF,不要缺失值的列 df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income']) # 5、填充缺失值,可以用fillna来填充缺失值, # 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing # 对于数值类型,可以用均值或者中位数等填充 # fillna可以接收两种类型的参数: # 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。 # 也可以接收一个字典{列名:值}这样 # 先计算均值,并组织成一个字典 means = df_miss_no_income.agg(*[fn.means(c).alias(c+'_mean') for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('record')[0] means['gender'] = 'missing' df_miss_no_income.fillna(means).show()
''' 3、异常值处理 异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值 分位数去极值 *** 中位数绝对偏差去极值 正态分布去极值 上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值''' df_outliers = spark.createDataFrame([ (1, 143.5, 5.3, 28), (2, 154.2, 5.5, 45), (3, 342.3, 5.1, 99), (4, 144.5, 5.5, 33), (5, 133.2, 5.4, 54), (6, 124.1, 5.1, 21), (7, 129.2, 5.3, 42), ], ['id', 'weight', 'height', 'age']) # 求出每个字段的边界 用4分位计算 cols = ['weight','height','age'] bounds = { } for col in cols: quantiles = df_outliers.approxQuantile(col,[0.25,0.75],0.05) IQR = quantiles[1] - quantiles[0] bounds[col] = [ quantiles[0] - 1.5*IQR, quantiles[1] + 1.5*IQR ] 结果:{ 'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]} #approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。 outliers = df_outliers.select(*['id'] + [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1])).alias(c + '_o')] for c in cols).show() # +---+--------+--------+-----+ # | id|weight_o|height_o|age_o| # +---+--------+--------+-----+ # | 1| false| false|false| # | 2| false| false|false| # | 3| true| false| true| # | 4| false| false|false| # | 5| false| false|false| # | 6| false| false|false| # | 7| false| false|false| # +---+--------+--------+-----+ # 与原始数据关联 df_outliers = df_outliers.join(outliers,on='id') # 取出有问题的值 df_outliers.filter('weight_o').select('id','weight').show() df_outliers.filter('age_o').select('id','age').show() # +---+------+ # | id|weight| # +---+------+ # | 3| 342.3| # +---+------+ # +---+---+ # | id|age| # +---+---+ # | 3| 99| # +---+---+
转载地址:http://jmdsn.baihongyu.com/