CS190.1x-ML_lab4_ctr_student

这次lab主要主要是研究click-through rate (CTR)。数据集来自于Kaggle的Criteo Labs dataset。相关ipynb文件见我github

作业分成5个部分:one-hot encoding处理特征;构造one-hot encoding dictionary;解析CTR数据并处理特征;用逻辑回归来预测CTR;通过feature hashing来减少特征维度。

Featurize categorical data using one-hot-encoding

One-hot-encoding

这部分我们要实现one-hot encoding。我们在用实际数据处理前,先用一个包含3个样本的数据集上练习一下。样本有三个特征:什么动物,什么颜色,吃什么。其中最后一个特征可选。第一个特征有三个值:bear, cat, mouse;第二个有两个:black, tabby;第三个有两个:mouse, salmon。我们首先第一步是把(featureID, category) 映射到从0开始的连续整数。

# Data for manual OHE
# Note: the first data point does not include any value for the optional third feature
sampleOne = [(0, 'mouse'), (1, 'black')]
sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]
sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree])


# TODO: Replace <FILL IN> with appropriate code
sampleOHEDictManual = {}
sampleOHEDictManual[(0,'bear')] = 0
sampleOHEDictManual[(0,'cat')] = 1
sampleOHEDictManual[(0,'mouse')] = 2
sampleOHEDictManual[(1,'black')] = 3
sampleOHEDictManual[(1,'tabby')] = 4
sampleOHEDictManual[(2,'mouse')] = 5
sampleOHEDictManual[(2,'salmon')] = 6

Sparse vectors

在处理稀疏矩阵的时候,我们常用SparseVector,而不是numpy,因为能减少计算量和存储空间。我们这里就验证SparseVector和numpy的计算结果是一样的。我们看看SparseVector的用法:

pyspark.mllib.linalg.SparseVector(size, *args)[source]

size是指向量的长度,后面的参数一般是两个list,前者是存值的索引,后者是具体的值。比如

a = SparseVector(4, [1, 3], [3.0, 4.0])

a的实际样子是[0, 3.0, 0, 4.0]

import numpy as np
from pyspark.mllib.linalg import SparseVector


# TODO: Replace <FILL IN> with appropriate code
aDense = np.array([0., 3., 0., 4.])
aSparse = SparseVector(4,[1,3],[3,4])

bDense = np.array([0., 0., 0., 1.])
bSparse = SparseVector(4,{3:1})

w = np.array([0.4, 3.1, -1.4, -.5])
print aDense.dot(w)
print aSparse.dot(w)
print bDense.dot(w)
print bSparse.dot(w)

OHE features as sparse vectors

这里是把上面的特征手动转换为SparseVector

# TODO: Replace <FILL IN> with appropriate code
sampleOneOHEFeatManual = SparseVector(7,[2,3],[1,1])
sampleTwoOHEFeatManual = SparseVector(7,[1,4,5],[1,1,1])
sampleThreeOHEFeatManual = SparseVector(7,[0,3,6],[1,1,1])

Define a OHE function

这一步是通过上面我们定义的字典,用代码对数据进行OHE转换

# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    sparseIndex = np.sort(list(OHEDict[i] for i in rawFeats))
    sparseValues = np.ones(len(rawFeats))
    return SparseVector(numOHEFeats,sparseIndex,sparseValues)

# Calculate the number of features in sampleOHEDictManual
numSampleOHEFeats = len(sampleOHEDictManual)

# Run oneHotEnoding on sampleOne
sampleOneOHEFeat = oneHotEncoding(sampleOne,sampleOHEDictManual,numSampleOHEFeats)

print sampleOneOHEFeat

Apply OHE to a dataset

# TODO: Replace <FILL IN> with appropriate code
sampleOHEData = sampleDataRDD.map(lambda x : oneHotEncoding(x,sampleOHEDictManual,numSampleOHEFeats))
print sampleOHEData.collect()

Part 2 Construct an OHE dictionary

Pair RDD of (featureID, category)

我们现在通过代码来构造OHE字典。首先把所以的特征值放到一个RDD里,然后去重。

# TODO: Replace <FILL IN> with appropriate code
sampleDistinctFeats = (sampleDataRDD
                       .flatMap(lambda x: x).distinct())

OHE Dictionary from distinct features

我们现在要构造出OHE字典。主要是通过zipWithIndex和collectAsMap。前者是对RDD里的元素增加索引,后者是把RDD转换为Map。

# TODO: Replace <FILL IN> with appropriate code
sampleOHEDict = (sampleDistinctFeats
                          .zipWithIndex().collectAsMap())
print sampleOHEDict

Automated creation of an OHE dictionary


# TODO: Replace <FILL IN> with appropriate code
def createOneHotDict(inputData):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
            made up of a list of (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
    """
    inputOHEDict = (inputData.flatMap(lambda x:x).distinct().zipWithIndex().collectAsMap())
    return inputOHEDict
sampleOHEDictAuto = createOneHotDict(sampleDataRDD)
print sampleOHEDictAuto

Part 3 Parse CTR data and generate OHE features

Loading and splitting the data

首先要下载数据,这段代码在LAB1里面也看到过


# Run this code to view Criteo's agreement
from IPython.lib.display import IFrame

IFrame("http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/",
       600, 350)

# TODO: Replace <FILL IN> with appropriate code
# Just replace <FILL IN> with the url for dac_sample.tar.gz
import glob
import os.path
import tarfile
import urllib
import urlparse

# Paste url, url should end with: dac_sample.tar.gz
url = '<FILL IN>'

url = url.strip()
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
inputDir = os.path.split(fileName)[0]

def extractTar(check = False):
    # Find the zipped archive and extract the dataset
    tars = glob.glob('dac_sample*.tar.gz*')
    if check and len(tars) == 0:
      return False

    if len(tars) > 0:
        try:
            tarFile = tarfile.open(tars[0])
        except tarfile.ReadError:
            if not check:
                print 'Unable to open tar.gz file.  Check your URL.'
            return False

        tarFile.extract('dac_sample.txt', path=inputDir)
        print 'Successfully extracted: dac_sample.txt'
        return True
    else:
        print 'You need to retry the download with the correct url.'
        print ('Alternatively, you can upload the dac_sample.tar.gz file to your Jupyter root ' +
              'directory')
        return False


if os.path.isfile(fileName):
    print 'File is already available. Nothing to do.'
elif extractTar(check = True):
    print 'tar.gz file was already available.'
elif not url.endswith('dac_sample.tar.gz'):
    print 'Check your download url.  Are you downloading the Sample dataset?'
else:
    # Download the file and store it in the same directory as this notebook
    try:
        urllib.urlretrieve(url, os.path.basename(urlparse.urlsplit(url).path))
    except IOError:
        print 'Unable to download and store: {0}'.format(url)

    extractTar()
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, 2)
               .map(lambda x: x.replace('	', ',')))  # work with either ',' or '	' separated data
    print rawData.take(1)

读取完数据后,把数据集分成三份,然后缓存起来。

# TODO: Replace <FILL IN> with appropriate code
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights,seed)
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()

nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawData.take(1)

Extract features

因为解析出来的特征并没有行列的信息,所以我们把特征处理成(列,值)的样子,其中列是这个特征在第几列,值是指本来的值,然后统计有多少个不同的(列,值)对。

# TODO: Replace <FILL IN> with appropriate code
def parsePoint(point):
    """Converts a comma separated string into a list of (featureID, value) tuples.

    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
    """
    featuresList = point.split(',')
    return list((i,featuresList[i+1]) for i in range(len(featuresList)-1))

parsedTrainFeat = rawTrainData.map(parsePoint)

numCategories = (parsedTrainFeat
                 .flatMap(lambda x: x)
                 .distinct()
                 .map(lambda x: (x[0], 1))
                 .reduceByKey(lambda x, y: x + y)
                 .sortByKey()
                 .collect())

print numCategories[2][1]

Create an OHE dictionary from the dataset

我们现在处理成了和part 2里一样了,看看这个字典的大小。

# TODO: Replace <FILL IN> with appropriate code
ctrOHEDict = createOneHotDict(parsedTrainFeat)
numCtrOHEFeats = len(ctrOHEDict.keys())
print numCtrOHEFeats
print ctrOHEDict[(0, '')]

Apply OHE to the dataset

我们在上面的基础上,把特征的Label加进去,就是实现一个parsePoint加强版。

from pyspark.mllib.regression import LabeledPoint
# TODO: Replace <FILL IN> with appropriate code
def parseOHEPoint(point, OHEDict, numOHEFeats):
    """Obtain the label and feature vector for this raw observation.

    Note:
        You must use the function `oneHotEncoding` in this implementation or later portions
        of this lab may not function as expected.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.
        OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The number of unique features in the training dataset.

    Returns:
        LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
            raw features based on the provided OHE dictionary.
    """
    pointList = point.split(',')
    pointLabel = pointList[0]
    pointFeaturesRaw = list((i,pointList[i+1]) for i in range(len(pointList)-1))
    pointFeatures = oneHotEncoding(pointFeaturesRaw,OHEDict,numOHEFeats)
    
    return LabeledPoint(pointLabel,pointFeatures)

OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
print OHETrainData.take(1)

# Check that oneHotEncoding function was used in parseOHEPoint
backupOneHot = oneHotEncoding
oneHotEncoding = None
withOneHot = False
try: parseOHEPoint(rawTrainData.take(1)[0], ctrOHEDict, numCtrOHEFeats)
except TypeError: withOneHot = True
oneHotEncoding = backupOneHot

Handling unseen features

假如测试集和验证集里面有的特征没有出现在训练集里面,所以我们要更新oneHotEncoding()来对付之前没有出现过的特征值。

# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
        ignored.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    crossList = list(OHEDict.get(i,'-1') for i in rawFeats)
    sparseIndex = np.sort([elem for elem in crossList if elem != "-1"])
    sparseValues = np.ones(len(sparseIndex))
    return SparseVector(numOHEFeats,sparseIndex,sparseValues)

OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHEValidationData.cache()
print OHEValidationData.take(1)

Part 4 CTR prediction and logloss evaluation

Logistic regression

前面把数据处理好了,现在需要训练我们的分类器了,这里用到的是逻辑回归。主要的思路是,用LogisticRegressionWithSGD训练,得到模型LogisticRegressionModel。

from pyspark.mllib.classification import LogisticRegressionWithSGD

# fixed hyperparameters
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True

model0 = LogisticRegressionWithSGD.train(OHETrainData, iterations=numIters, step=stepSize,regParam=regParam, regType=regType, intercept=includeIntercept)
sortedWeights = sorted(model0.weights)
print sortedWeights[:5], model0.intercept

Log loss

# TODO: Replace <FILL IN> with appropriate code
from math import log

def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12
    if p == 0 :
        p += epsilon
    if p == 1 :
        p -= epsilon
    if y == 1 :
        return -log(p)
    if y == 0 :
        return -log(1-p)

Baseline log loss

现在我们要用上面写的loss function来计算训练集的Baseline Train Logloss。这里用标签的平均值。

# TODO: Replace <FILL IN> with appropriate code
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower
classOneFracTrain = OHETrainData.map(lambda lp: lp.label).sum()/OHETrainData.count()
print classOneFracTrain

logLossTrBase = OHETrainData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label)).sum()/OHETrainData.count()
print 'Baseline Train Logloss = {0:.3f}
'.format(logLossTrBase)

Predicted probability

通过上面训练好的模型来计算概率


# TODO: Replace <FILL IN> with appropriate code
from math import exp #  exp(-t) = e^-t

def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = 1 / (1 + exp(-x.dot(w)-intercept))

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return rawPrediction

trainingPredictions = OHETrainData.map(lambda lp: getP(lp.features,model0.weights,model0.intercept))

print trainingPredictions.take(5)

valuate the model


# TODO: Replace <FILL IN> with appropriate code
def evaluateResults(model, data):
    """Calculates the log loss for the data given the model.

    Args:
        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

    Returns:
        float: Log loss for the data.
    """
    dataPrediction = data.map(lambda lp : (getP(lp.features,model.weights,model.intercept),lp.label))
    logLoss = dataPrediction.map(lambda (x,y) : computeLogLoss(x,y)).sum() / dataPrediction.count()
    return logLoss

logLossTrLR0 = evaluateResults(model0, OHETrainData)
print ('OHE Features Train Logloss:
	Baseline = {0:.3f}
	LogReg = {1:.3f}'
       .format(logLossTrBase, logLossTrLR0))

Validation log loss

验证集上又计算一遍。。。


# TODO: Replace <FILL IN> with appropriate code
logLossValBase = OHEValidationData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label)).sum()/OHEValidationData.count()

logLossValLR0 = evaluateResults(model0, OHEValidationData)
print ('OHE Features Validation Logloss:
	Baseline = {0:.3f}
	LogReg = {1:.3f}'
       .format(logLossValBase, logLossValLR0))

我们在这里通过改变不同的阈值来获得ROC曲线。

Part 5 Reduce feature dimension via feature hashing

上面的例子告诉我们,通过OHE,我们可以获得一个不错的准确率,但是特征的个数太多了,多达233K个。所以我们需要feature hashing。

Hash function


from collections import defaultdict
import hashlib

def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

# Reminder of the sample values:
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

# TODO: Replace <FILL IN> with appropriate code
# Use four buckets
sampOneFourBuckets = hashFunction(4, sampleOne, True)
sampTwoFourBuckets = hashFunction(4, sampleTwo, True)
sampThreeFourBuckets = hashFunction(4, sampleThree, True)

# Use one hundred buckets
sampOneHundredBuckets = hashFunction(100, sampleOne, True)
sampTwoHundredBuckets = hashFunction(100, sampleTwo, True)
sampThreeHundredBuckets = hashFunction(100, sampleThree, True)

print '		 4 Buckets 			 100 Buckets'
print 'SampleOne:	 {0}		 {1}'.format(sampOneFourBuckets, sampOneHundredBuckets)
print 'SampleTwo:	 {0}		 {1}'.format(sampTwoFourBuckets, sampTwoHundredBuckets)
print 'SampleThree:	 {0}	 {1}'.format(sampThreeFourBuckets, sampThreeHundredBuckets)

Creating hashed features

这几步和上面差不多

# TODO: Replace <FILL IN> with appropriate code
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A comma separated string where the first value is the label and the rest are
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    pointList = point.split(',')
    pointSize = len(pointList) -1
    pointLabel = pointList[0]
    pointFeatureRaw = list((i,pointList[i+1]) for i in range(pointSize ))
    pointFeature = SparseVector(numBuckets,hashFunction(numBuckets , pointFeatureRaw, True))
    return LabeledPoint(pointLabel,pointFeature)

numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda x:parseHashPoint(x,numBucketsCTR))
hashTestData.cache()

print hashTrainData.take(1)

Sparsity

计算稀疏度。。定义在注释里面有

# TODO: Replace <FILL IN> with appropriate code
def computeSparsity(data, d, n):
    """Calculates the average sparsity for the features in an RDD of LabeledPoints.

    Args:
        data (RDD of LabeledPoint): The LabeledPoints to use in the sparsity calculation.
        d (int): The total number of features.
        n (int): The number of observations in the RDD.

    Returns:
        float: The average of the ratio of features in a point to total features.
    """
    return data.map(lambda x: len(x.features.values)).sum()/float(d*n)

averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain)
averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain)

print 'Average OHE Sparsity: {0:.7e}'.format(averageSparsityOHE)
print 'Average Hash Sparsity: {0:.7e}'.format(averageSparsityHash)

Logistic model with hashed features


numIters = 500
regType = 'l2'
includeIntercept = True

# Initialize variables using values from initial model training
bestModel = None
bestLogLoss = 1e10

# TODO: Replace <FILL IN> with appropriate code
stepSizes = (1,10)
regParams = (1e-6,1e-3)
for stepSize in stepSizes:
    for regParam in regParams:
        model = (LogisticRegressionWithSGD
                 .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
                        intercept=includeIntercept))
        logLossVa = evaluateResults(model, hashValidationData)
        print ('	stepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
               .format(stepSize, regParam, logLossVa))
        if (logLossVa < bestLogLoss):
            bestModel = model
            bestLogLoss = logLossVa

print ('Hashed Features Validation Logloss:
	Baseline = {0:.3f}
	LogReg = {1:.3f}'
       .format(logLossValBase, bestLogLoss))

Evaluate on the test set

和上面一样。。。

# TODO: Replace <FILL IN> with appropriate code
# Log loss for the best model from (5d)
logLossTest = evaluateResults(bestModel, hashTestData)

# Log loss for the baseline model
logLossTestBaseline = hashTestData.map(lambda lp : computeLogLoss(classOneFracTrain,lp.label )).sum() / hashTestData.count()

print ('Hashed Features Test Log Loss:
	Baseline = {0:.3f}
	LogReg = {1:.3f}'
       .format(logLossTestBaseline, logLossTest))
原文地址:https://www.cnblogs.com/-Sai-/p/6754586.html