dask

DASK

https://github.com/dask/dask

https://dask.org/

DASK提供并行计算和任务调度能力。

集成和很多数据科学工具。

堪称数据科学家的SPARK.

Dask provides advanced parallelism for analytics,

enabling performance at scale for the tools you love

Integrates with existing projects

Built with the broader community

Dask is open source and freely available. It is developed in coordination with other community projects like Numpy, Pandas, and Scikit-Learn.

Numpy

Dask arrays scale Numpy workflows, enabling multi-dimensional data analysis in earth science, satellite imagery, genomics, biomedical applications, and machine learning algorithms.

Pandas

Dask dataframes scale Pandas workflows, enabling applications in time series, business intelligence, and general data munging on big data.

Scikit-Learn

Dask-ML scales machine learning APIs like Scikit-Learn and XGBoost to enable scalable training and prediction on large models and large datasets.

和SPARK差别

https://docs.dask.org/en/latest/spark.html

spark是成熟的并且是包罗万象的。

dask是轻量和容易集成现有代码的。提供的灵活的并行到已有方案。

  • Spark is mature and all-inclusive. If you want a single project that does everything and you’re already on Big Data hardware, then Spark is a safe bet, especially if your use cases are typical ETL + SQL and you’re already using Scala.
  • Dask is lighter weight and is easier to integrate into existing code and hardware. If your problems vary beyond typical ETL + SQL and you want to add flexible parallelism to existing solutions, then Dask may be a good fit, especially if you are already using Python and associated libraries like NumPy and Pandas.

工作流接口

https://docs.dask.org/en/latest/delayed.html

工作流接口不是dask的提供的主要对象,其主要对象为数据结构,

如果数据结构不能被使用,可使用底层提供的接口,定制并行算法。

Sometimes problems don’t fit into one of the collections like dask.array or dask.dataframe.

In these cases, users can parallelize custom algorithms using the simpler dask.delayed interface.

This allows one to create graphs directly with a light annotation of normal python code:

>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()

DEMO

https://github.com/fanqingsong/machine_learning_workflow_on_dask/blob/master/kmeans_with_workflow.py

from csv import reader
from sklearn.cluster import KMeans
import joblib
from dask import delayed
import dask


# Load a CSV file
def load_csv(filename):
    file = open(filename, "rt")
    lines = reader(file)
    dataset = list(lines)
    return dataset

# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())

# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup

def getRawIrisData():
    # Load iris dataset
    filename = 'iris.csv'
    dataset = load_csv(filename)
    print('Loaded data file {0} with {1} rows and {2} columns'.format(filename, len(dataset), len(dataset[0])))
    print(dataset[0])
    # convert string columns to float
    for i in range(4):
        str_column_to_float(dataset, i)
    # convert class column to int
    lookup = str_column_to_int(dataset, 4)
    print(dataset[0])
    print(lookup)

    return dataset

@dask.delayed
def getTrainData():
    dataset = getRawIrisData()
    trainData = [ [one[0], one[1], one[2], one[3]] for one in dataset ]

    return trainData

@dask.delayed
def getNumClusters():
    return 3

@dask.delayed
def train(numClusters, trainData):
    print("numClusters=%d" % numClusters)

    model = KMeans(n_clusters=numClusters)

    model.fit(trainData)

    # save model for prediction
    joblib.dump(model, 'model.kmeans')

    return trainData

@dask.delayed
def predict(irisData):
    # test saved prediction
    model = joblib.load('model.kmeans')

    # cluster result
    labels = model.predict(irisData)

    print("cluster result")
    print(labels)


def machine_learning_workflow_pipeline():
    trainData = getTrainData()
    numClusters = getNumClusters()
    trainData = train(numClusters, trainData)
    total = predict(trainData)

    #total.visualize()

    total.compute()



if __name__ == "__main__":
    machine_learning_workflow_pipeline()
原文地址:https://www.cnblogs.com/lightsong/p/13826309.html