pyspark数据处理学习笔记

PYSPARK学习笔记

Defining a schema

# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

展示数据

# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport'])) #增加一列名为airport的,并置为小写

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport']) # 删掉Destination Airport这一列

# Show the DataFrame
aa_dfw_df.show() # show等价于head

parquet文件读写

为了节省内存

pyspark 写文件到hdfs (一般都存为parquet读写都比json、csv快,还节约约75%存储空间)
其他hadoop生态圈内容在这

# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one 
df3 = df1.union(df2) # 等价于r里面的rbind,就是按行拼接

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights') # 创建一个可替换的临时表

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

DataFrame column operations

对数据框列的操作
筛选操作

# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False) 去除重复值

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
# Show the distinct VOTER_NAME entries
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20').  # 筛选操作,同r

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

withcolumn

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, 's+'))  增加一列

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0)) # getItem是查一个字典的映射

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')  # 删除一列

# Show the voter_df DataFrame
voter_df.show()  #等价于head()

select


# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand())) #新增一列random_val ,当title=='Councilmember'时,产生随机数

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()


# 查看一下结果
<script.py> output:
    +----------+-------------+-------------------+--------------------+
    |      DATE|        TITLE|         VOTER_NAME|          random_val|
    +----------+-------------+-------------------+--------------------+
    |02/08/2017|Councilmember|  Jennifer S. Gates|  0.6072090670654174|
    |02/08/2017|Councilmember| Philip T. Kingston|  0.8779894137938334|
    |02/08/2017|        Mayor|Michael S. Rawlings|                null|
    |02/08/2017|Councilmember|       Adam Medrano|  0.2496996705882797|
    |02/08/2017|Councilmember|       Casey Thomas| 0.20338678125255483|
    |02/08/2017|Councilmember|Carolyn King Arnold|   0.911553073855913|
    |02/08/2017|Councilmember|       Scott Griggs|  0.1134459593298831|
    |02/08/2017|Councilmember|   B. Adam  McGough| 0.42041407481646487|
    |02/08/2017|Councilmember|       Lee Kleinman|  0.9109217573924748|
    |02/08/2017|Councilmember|      Sandy Greyson|0.055814633336865205|
    |02/08/2017|Councilmember|  Jennifer S. Gates|  0.9429223451510873|
    |02/08/2017|Councilmember| Philip T. Kingston|0.022915415927586502|
    |02/08/2017|        Mayor|Michael S. Rawlings|                null|
    |02/08/2017|Councilmember|       Adam Medrano|  0.9833216773540682|
    |02/08/2017|Councilmember|       Casey Thomas|  0.2944981610876857|
    |02/08/2017|Councilmember|Carolyn King Arnold|    0.67447246683049|
    |02/08/2017|Councilmember| Rickey D. Callahan| 0.20480391619888827|
    |01/11/2017|Councilmember|  Jennifer S. Gates| 0.14057384767559866|
    |04/25/2018|Councilmember|     Sandy  Greyson|  0.6598564900991037|
    |04/25/2018|Councilmember| Jennifer S.  Gates|  0.9412719200394332|
    +----------+-------------+-------------------+--------------------+
    only showing top 20 rows

when

# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand()) #这个when完美的取代了if else。。
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()


### user defined functions

udf  自定义函数

```r
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType()). #函数的返回结果是字符串类型,这里需要定义一下函数,有点类似于声明一个函数

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits)) #第一个参数是新增一列的名字,第二个参数就是调用这个函数并给函数赋值

# Show the DataFrame
voter_df.show()


<script.py> output:
    +----------+-------------+-------------------+--------------------+----------+---------+---------------------+
    |      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|first_and_middle_name|
    +----------+-------------+-------------------+--------------------+----------+---------+---------------------+
    |02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|          Jennifer S.|
    |02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|            Philip T.|
    |02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|           Michael S.|
    |02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|                 Adam|
    |02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|                Casey|
    |02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|         Carolyn King|
    |02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|     Scott|   Griggs|                Scott|
    |02/08/2017|Councilmember|   B. Adam  McGough| [B., Adam, McGough]|        B.|  McGough|              B. Adam|
    |02/08/2017|Councilmember|       Lee Kleinman|     [Lee, Kleinman]|       Lee| Kleinman|                  Lee|
    |02/08/2017|Councilmember|      Sandy Greyson|    [Sandy, Greyson]|     Sandy|  Greyson|                Sandy|
    |02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|          Jennifer S.|
    |02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|            Philip T.|
    |02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|           Michael S.|
    |02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|                 Adam|
    |02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|                Casey|
    |02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|         Carolyn King|
    |02/08/2017|Councilmember| Rickey D. Callahan|[Rickey, D., Call...|    Rickey| Callahan|            Rickey D.|
    |01/11/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|          Jennifer S.|
    |04/25/2018|Councilmember|     Sandy  Greyson|    [Sandy, Greyson]|     Sandy|  Greyson|                Sandy|
    |04/25/2018|Councilmember| Jennifer S.  Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|          Jennifer S.|
    +----------+-------------+-------------------+--------------------+----------+---------+---------------------+
    only showing top 20 rows

Partitioning and lazy processing

分区和延时处理

spark 不是实时处理的,而是延时处理任务,也就是lazy

I’m # Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct() #查找值并去重,上次有个任务做了多次的groupby现在可以用这个
# Count the rows in voter_df
print("
There are %d rows in the voter_df DataFrame.
" % voter_df.count()) #计数

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id()) #增加一列

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10) #排序任务

cache

缓存机制

start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

计算时间

# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_0*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time(). #开始时间
print("Total rows in full DataFrame:	%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))  # 算运行时间
 
start_time_b = time.time()
print("Total rows in split DataFrame:	%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

集群配置


# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

json

输出json格式

# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite').  #输出json格式

~ 这个符号表示去取反

# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '	')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d
Final count: %d" % (initial_count, final_count))

原文地址:https://www.cnblogs.com/gaowenxingxing/p/14511179.html