PySpark 入门

1.wordCount 

 1 from __future__ import print_function
 2 
 3 import sys
 4 from operator import add
 5 
 6 from pyspark import SparkContext
 7 
 8 
 9 if __name__ == "__main__":
10 if len(sys.argv) != 2:
11 print("Usage: wordcount <file>", file=sys.stderr)
12 exit(-1)
13 sc = SparkContext(appName="PythonWordCount")
14 lines = sc.textFile(sys.argv[1], 1)
15 counts = lines.flatMap(lambda x: x.split(' ')) 
16 .map(lambda x: (x, 1)) 
17 .reduceByKey(add)
18 output = counts.collect()
19 for (word, count) in output:
20 print("%s: %i" % (word, count))
21 
22 sc.stop()

 

2. Sql.py

Sql介绍了DataFrame的使用方法

from __future__ import print_function

import os
import sys


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType


if __name__ == "__main__":
    sc = SparkContext(appName="PythonSQL")
    sqlContext = SQLContext(sc)

    # RDD is created from a list of rows
    some_rdd = sc.parallelize([Row(name="John", age=19),
                              Row(name="Smith", age=23),
                              Row(name="Sarah", age=18)])
    # Infer schema from the first row, create a DataFrame and print the schema
    some_df = sqlContext.createDataFrame(some_rdd)
    some_df.printSchema()

    # Another RDD is created from a list of tuples
    another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
    # Schema with two fields - person_name and person_age
    schema = StructType([StructField("person_name", StringType(), False),
                        StructField("person_age", IntegerType(), False)])
    # Create a DataFrame by applying the schema to the RDD and print the schema
    another_df = sqlContext.createDataFrame(another_rdd, schema)
    another_df.printSchema()
    # root
    #  |-- age: integer (nullable = true)
    #  |-- name: string (nullable = true)

    # A JSON dataset is pointed to by path.

3. Sort

sort实现了排序功能,主要通过sortByKey, 也可以使用SortWith, 注意如果数据量特别大,不要使用collect, 而是应该将rdd repatition为1个分区然后保存在hdfs上使用

from __future__ import print_function

import sys

from pyspark import SparkContext


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: sort <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonSort")
    lines = sc.textFile(sys.argv[1], 1)
    sortedCount = lines.flatMap(lambda x: x.split(' ')) 
        .map(lambda x: (int(x), 1)) 
        .sortByKey(lambda x: x)
    # This is just a demo on how to bring all the sorted data back to a single node.
    # In reality, we wouldn't want to collect all the data to the driver node.
    output = sortedCount.collect()
    for (num, unitcount) in output:
        print(num)

    sc.stop()

4. LR回归

from __future__ import print_function

import sys

import numpy as np
from pyspark import SparkContext


D = 10  # Number of dimensions

# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
# make further computations faster.
# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
def readPointBatch(iterator):
    strs = list(iterator)
    matrix = np.zeros((len(strs), D + 1))
    for i, s in enumerate(strs):
        matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
    return [matrix]

if __name__ == "__main__":

    if len(sys.argv) != 3:
        print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
        exit(-1)

    print("""WARN: This is a naive implementation of Logistic Regression and is
      given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
      to see how MLlib's implementation is used.""", file=sys.stderr)
    sc = SparkContext(appName="PythonLR")
    points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
    iterations = int(sys.argv[2])

    # Initialize w to a random value
    w = 2 * np.random.ranf(size=D) - 1
    print("Initial w: " + str(w))

    # Compute logistic regression gradient for a matrix of data points
    def gradient(matrix, w):
        Y = matrix[:, 0]    # point labels (first column of input file)
        X = matrix[:, 1:]   # point coordinates
        # For each point (x, y), compute gradient function, then sum these up
        return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)

    def add(x, y):
        x += y
        return x

    for i in range(iterations):
        print("On iteration %i" % (i + 1))
        w -= points.map(lambda m: gradient(m, w)).reduce(add)

    print("Final w: " + str(w))

    sc.stop()

 

pyspark递交到yarn上运行

/home/hadoop/soft/spark/bin/spark-submit

--master yarn

--deploy-mode cluster  

--num-executors 1  

--executor-memory 1G  

wordCount.py

原文地址:https://www.cnblogs.com/energy1010/p/10161475.html