提交 1299bfda 编写于 作者: W wizardforcel

init

上级
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.DS_Store
# gitbook
_book
# node.js
node_modules
# windows
Thumbs.db
# word
~$*.docx
~$*.doc
# 1\. Preface
## 1.1\. About
### 1.1.1\. About this note
This is a shared repository for [Learning Apache Spark Notes](https://github.com/runawayhorse001/LearningApacheSpark). The PDF version can be downloaded from [HERE](pyspark.pdf). The first version was posted on Github in [ChenFeng](https://mingchen0919.github.io/learning-apache-spark/index.html) ([[Feng2017]](reference.html#feng2017)). This shared repository mainly contains the self-learning and self-teaching notes from Wenqiang during his [IMA Data Science Fellowship](https://www.ima.umn.edu/2016-2017/SW1.23-3.10.17#). The reader is referred to the repository [https://github.com/runawayhorse001/LearningApacheSpark](https://github.com/runawayhorse001/LearningApacheSpark) for more details about the `dataset` and the `.ipynb` files.
In this repository, I try to use the detailed demo code and examples to show how to use each main functions. If you find your work wasn’t cited in this note, please feel free to let me know.
Although I am by no means an data mining programming and Big Data expert, I decided that it would be useful for me to share what I learned about PySpark programming in the form of easy tutorials with detailed example. I hope those tutorials will be a valuable tool for your studies.
The tutorials assume that the reader has a preliminary knowledge of programming and Linux. And this document is generated automatically by using [sphinx](http://sphinx.pocoo.org).
### 1.1.2\. About the authors
* **Wenqiang Feng**
* Data Scientist and PhD in Mathematics
* University of Tennessee at Knoxville
* Email: [von198@gmail.com](mailto:von198%40gmail.com)
* **Biography**
Wenqiang Feng is Data Scientist within DST’s Applied Analytics Group. Dr. Feng’s responsibilities include providing DST clients with access to cutting-edge skills and technologies, including Big Data analytic solutions, advanced analytic and data enhancement techniques and modeling.
Dr. Feng has deep analytic expertise in data mining, analytic systems, machine learning algorithms, business intelligence, and applying Big Data tools to strategically solve industry problems in a cross-functional business. Before joining DST, Dr. Feng was an IMA Data Science Fellow at The Institute for Mathematics and its Applications (IMA) at the University of Minnesota. While there, he helped startup companies make marketing decisions based on deep predictive analytics.
Dr. Feng graduated from University of Tennessee, Knoxville, with Ph.D. in Computational Mathematics and Master’s degree in Statistics. He also holds Master’s degree in Computational Mathematics from Missouri University of Science and Technology (MST) and Master’s degree in Applied Mathematics from the University of Science and Technology of China (USTC).
* **Declaration**
The work of Wenqiang Feng was supported by the IMA, while working at IMA. However, any opinion, finding, and conclusions or recommendations expressed in this material are those of the author and do not necessarily reflect the views of the IMA, UTK and DST.
## 1.2\. Motivation for this tutorial
I was motivated by the [IMA Data Science Fellowship](https://www.ima.umn.edu/2016-2017/SW1.23-3.10.17#) project to learn PySpark. After that I was impressed and attracted by the PySpark. And I foud that:
> 1. It is no exaggeration to say that Spark is the most powerful Bigdata tool.
> 2. However, I still found that learning Spark was a difficult process. I have to Google it and identify which one is true. And it was hard to find detailed examples which I can easily learned the full process in one file.
> 3. Good sources are expensive for a graduate student.
## 1.3\. Copyright notice and license info
This [Learning Apache Spark with Python](pyspark.pdf) PDF file is supposed to be a free and living document, which is why its source is available online at [https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf). But this document is licensed according to both [MIT License](https://github.com/runawayhorse001/LearningApacheSpark/blob/master/LICENSE) and [Creative Commons Attribution-NonCommercial 2.0 Generic (CC BY-NC 2.0) License](https://creativecommons.org/licenses/by-nc/2.0/legalcode).
**When you plan to use, copy, modify, merge, publish, distribute or sublicense, Please see the terms of those licenses for more details and give the corresponding credits to the author**.
## 1.4\. Acknowledgement
At here, I would like to thank Ming Chen, Jian Sun and Zhongbo Li at the University of Tennessee at Knoxville for the valuable disscussion and thank the generous anonymous authors for providing the detailed solutions and source code on the internet. Without those help, this repository would not have been possible to be made. Wenqiang also would like to thank the [Institute for Mathematics and Its Applications (IMA)](https://www.ima.umn.edu/) at [University of Minnesota, Twin Cities](https://twin-cities.umn.edu/) for support during his IMA Data Scientist Fellow visit.
A special thank you goes to [Dr. Haiping Lu](http://staffwww.dcs.shef.ac.uk/people/H.Lu/), Lecturer in Machine Learning at Department of Computer Science, University of Sheffield, for recommending and heavily using my tutorial in his teaching class and for the valuable suggestions.
## 1.5\. Feedback and suggestions
Your comments and suggestions are highly appreciated. I am more than happy to receive corrections, suggestions or feedbacks through email ([von198@gmail.com](mailto:von198%40gmail.com)) for improvements.
\ No newline at end of file
此差异已折叠。
# 11\. Clustering
Chinese proverb
Sharpening the knife longer can make it easier to hack the firewood – old Chinese proverb
![https://runawayhorse001.github.io/LearningApacheSpark/_images/clustering_logo.png](img/eacebbc96f1d97c47d903d7981ce1167.jpg)
The above figure was generated by the code from: [Python Data Science Handbook](https://jakevdp.github.io/PythonDataScienceHandbook/06.00-figure-code.html#Expectation-Maximization).
## 11.1\. K-Means Model
### 11.1.1\. Introduction
k-means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. The approach kmeans follows to solve the problem is called **Expectation-Maximization**. It can be described as follows:
1. Assign some cluter centers
2. Repeated until converged
> * E-Step: assign points to the nearest center
> * M-step: set the cluster center to the mean
Given a set of observations ![(x_1, x_2, \cdots, x_m)](img/290e0b58c66f2b75c67fd1a15e3fe958.jpg). The objective function is
![J = \sum_{i=1}^{m}\sum_{k=1}^{K}w_{ik} ||x_i-c_k||^2](img/0d2c607e00ca608222b80fa6b61e780a.jpg)
where ![w_{ik}=1](img/a769c068095381d9207afe431343c95c.jpg) if ![x_i](img/82e0633e9121ff663a913eb95a3dd723.jpg) is in cluster ![k](img/739c6ec939fd446ba1cde4cf4620512a.jpg); otherwise ![w_{ik}=0](img/7a9506c9bd23ed8b08861cd51eaf5cc3.jpg) and ![c_k](img/cab981b993e03ab12309dd619da9e31d.jpg) is the centroid of ![x_i](img/82e0633e9121ff663a913eb95a3dd723.jpg) ‘s cluster.
Mathematically, k-means is a minimization problem with two parts: First, we minimize ![J](img/d6c0dcf5a8894d7495e320405295cc8f.jpg) w.r.t ![w_{ik}](img/c647aced84d4783e96a244a8af78ddd2.jpg) with ![c_k](img/cab981b993e03ab12309dd619da9e31d.jpg) fixed; Then minimize ![J](img/d6c0dcf5a8894d7495e320405295cc8f.jpg) w.r.t ![c_k](img/cab981b993e03ab12309dd619da9e31d.jpg) with ![w_{ik}](img/c647aced84d4783e96a244a8af78ddd2.jpg) fixed. i.e.
**E-step**:
![\frac{\partial J}{\partial w_{ik}} = \sum_{i=1}^{m}\sum_{k=1}^{K} ||x_i-c_k||^2\\
\Rightarrow w_{ik} =\left\{
\begin{array}{ll}
1, & \text{ if }{ k = argmin_{j} ||x_i-c_j||^2} \\
0, & \text{ otherwise }
\end{array}
\right.](img/c421a389906a45c77337a6a68fa78a0b.jpg)
**M-step**:
![\frac{\partial J}{\partial c_k} = 2\sum_{i=1}{m} w_{ik}(x_i-c_k) =0 \Rightarrow
c_k = \frac{\sum_{i=1}^{m}w_{ik}x_i}{\sum_{i=1}^{m}w_{ik}}](img/62e9e7ead57d4e2a4be61668d6aff334.jpg)
### 11.1.2\. Demo
1. Set up spark context and SparkSession
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark K-means example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
```
1. Load dataset
```
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("../data/iris.csv",header=True);
```
check the data set
```
df.show(5,True)
df.printSchema()
```
Then you will get
```
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
| 5.1| 3.5| 1.4| 0.2| setosa|
| 4.9| 3.0| 1.4| 0.2| setosa|
| 4.7| 3.2| 1.3| 0.2| setosa|
| 4.6| 3.1| 1.5| 0.2| setosa|
| 5.0| 3.6| 1.4| 0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows
root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
```
You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical).
```
df.describe().show()
```
Then you will get
```
+-------+------------------+-------------------+------------------+------------------+---------+
|summary| sepal_length| sepal_width| petal_length| petal_width| species|
+-------+------------------+-------------------+------------------+------------------+---------+
| count| 150| 150| 150| 150| 150|
| mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672| null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414| null|
| min| 4.3| 2.0| 1.0| 0.1| setosa|
| max| 7.9| 4.4| 6.9| 2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+
```
1. Convert the data to dense vector (**features**)
```
# convert the data to dense vector
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(['features'])
```
Note
You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in complex dataset.
Supervised learning version:
> ```
> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):
>
> from pyspark.ml import Pipeline
> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
> from pyspark.sql.functions import col
>
> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
> for c in categoricalCols ]
>
> # default setting: dropLast=True
> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
> outputCol="{0}_encoded".format(indexer.getOutputCol()))
> for indexer in indexers ]
>
> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
> + continuousCols, outputCol="features")
>
> pipeline = Pipeline(stages=indexers + encoders + [assembler])
>
> model=pipeline.fit(df)
> data = model.transform(df)
>
> data = data.withColumn('label',col(labelCol))
>
> return data.select(indexCol,'features','label')
>
> ```
Unsupervised learning version:
> ```
> def get_dummy(df,indexCol,categoricalCols,continuousCols):
> '''
> Get dummy variables and concat with continuous variables for unsupervised learning.
> :param df: the dataframe
> :param categoricalCols: the name list of the categorical data
> :param continuousCols: the name list of the numerical data
> :return k: feature matrix
>
> :author: Wenqiang Feng
> :email: von198@gmail.com
> '''
>
> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
> for c in categoricalCols ]
>
> # default setting: dropLast=True
> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
> outputCol="{0}_encoded".format(indexer.getOutputCol()))
> for indexer in indexers ]
>
> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
> + continuousCols, outputCol="features")
>
> pipeline = Pipeline(stages=indexers + encoders + [assembler])
>
> model=pipeline.fit(df)
> data = model.transform(df)
>
> return data.select(indexCol,'features')
>
> ```
1. Transform the dataset to DataFrame
```
transformed= transData(df)
transformed.show(5, False)
```
```
+-----------------+
|features |
+-----------------+
|[5.1,3.5,1.4,0.2]|
|[4.9,3.0,1.4,0.2]|
|[4.7,3.2,1.3,0.2]|
|[4.6,3.1,1.5,0.2]|
|[5.0,3.6,1.4,0.2]|
+-----------------+
only showing top 5 rows
```
1. Deal With Categorical Variables
```
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", \
outputCol="indexedFeatures",\
maxCategories=4).fit(transformed)
data = featureIndexer.transform(transformed)
```
Now you check your dataset with
```
data.show(5,True)
```
you will get
```
+-----------------+-----------------+
| features| indexedFeatures|
+-----------------+-----------------+
|[5.1,3.5,1.4,0.2]|[5.1,3.5,1.4,0.2]|
|[4.9,3.0,1.4,0.2]|[4.9,3.0,1.4,0.2]|
|[4.7,3.2,1.3,0.2]|[4.7,3.2,1.3,0.2]|
|[4.6,3.1,1.5,0.2]|[4.6,3.1,1.5,0.2]|
|[5.0,3.6,1.4,0.2]|[5.0,3.6,1.4,0.2]|
+-----------------+-----------------+
only showing top 5 rows
```
Note
Since clustering algorithms including k-means use distance-based measurements to determine the similarity between data points, It’s strongly recommended to standardize the data to have a mean of zero and a standard deviation of one.
1. Elbow method to determine the optimal number of clusters for k-means clustering
```
import numpy as np
cost = np.zeros(20)
for k in range(2,20):
kmeans = KMeans()\
.setK(k)\
.setSeed(1) \
.setFeaturesCol("indexedFeatures")\
.setPredictionCol("cluster")
model = kmeans.fit(data)
cost[k] = model.computeCost(data) # requires Spark 2.0 or later
```
```
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()
```
![https://runawayhorse001.github.io/LearningApacheSpark/_images/elbow.png](img/92df4afaf5010b135936512a39fb87d8.jpg)
In my opinion, sometimes it’s hard to choose the optimal number of the clusters by using the `elbow method`. As shown in the following Figure, you can choose 3, 5 or even 8\. I will choose `3` in this demo.
![https://runawayhorse001.github.io/LearningApacheSpark/_images/elbow_rfm.png](img/d4b213f9046b3ed8b898fac4d4aeec34.jpg)
* Silhouette analysis
```
#PySpark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce # For Python 3.x
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
def optimal_k(df_in,index_col,k_min, k_max,num_runs):
'''
Determine optimal number of clusters by using Silhoutte Score Analysis.
:param df_in: the input dataframe
:param index_col: the name of the index column
:param k_min: the train dataset
:param k_min: the minmum number of the clusters
:param k_max: the maxmum number of the clusters
:param num_runs: the number of runs for each fixed clusters
:return k: optimal number of the clusters
:return silh_lst: Silhouette score
:return r_table: the running results table
:author: Wenqiang Feng
:email: von198@gmail.com
'''
start = time.time()
silh_lst = []
k_lst = np.arange(k_min, k_max+1)
r_table = df_in.select(index_col).toPandas()
r_table = r_table.set_index(index_col)
centers = pd.DataFrame()
for k in k_lst:
silh_val = []
for run in np.arange(1, num_runs+1):
# Trains a k-means model.
kmeans = KMeans()\
.setK(k)\
.setSeed(int(np.random.randint(100, size=1)))
model = kmeans.fit(df_in)
# Make predictions
predictions = model.transform(df_in)
r_table['cluster_{k}_{run}'.format(k=k, run=run)]= predictions.select('prediction').toPandas()
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
silh_val.append(silhouette)
silh_array=np.asanyarray(silh_val)
silh_lst.append(silh_array.mean())
elapsed = time.time() - start
silhouette = pd.DataFrame(list(zip(k_lst,silh_lst)),columns = ['k', 'silhouette'])
print('+------------------------------------------------------------+')
print("| The finding optimal k phase took %8.0f s. |" %(elapsed))
print('+------------------------------------------------------------+')
return k_lst[np.argmax(silh_lst, axis=0)], silhouette , r_table
```
```
k, silh_lst, r_table = optimal_k(scaledData,index_col,k_min, k_max,num_runs)
+------------------------------------------------------------+
| The finding optimal k phase took 1783 s. |
+------------------------------------------------------------+
```
```
spark.createDataFrame(silh_lst).show()
+---+------------------+
| k| silhouette|
+---+------------------+
| 3|0.8045154385557953|
| 4|0.6993528775512052|
| 5|0.6689286654221447|
| 6|0.6356184024841809|
| 7|0.7174102265711756|
| 8|0.6720861758298997|
| 9| 0.601771359881241|
| 10|0.6292447334578428|
+---+------------------+
```
From the silhouette list, we can choose `3` as the optimal number of the clusters.
Warning
`ClusteringEvaluator` in `pyspark.ml.evaluation` requires Spark 2.4 or later!!
1. Pipeline Architecture
```
from pyspark.ml.clustering import KMeans, KMeansModel
kmeans = KMeans() \
.setK(3) \
.setFeaturesCol("indexedFeatures")\
.setPredictionCol("cluster")
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, kmeans])
model = pipeline.fit(transformed)
cluster = model.transform(transformed)
```
1. k-means clusters
```
cluster = model.transform(transformed)
```
```
+-----------------+-----------------+-------+
| features| indexedFeatures|cluster|
+-----------------+-----------------+-------+
|[5.1,3.5,1.4,0.2]|[5.1,3.5,1.4,0.2]| 1|
|[4.9,3.0,1.4,0.2]|[4.9,3.0,1.4,0.2]| 1|
|[4.7,3.2,1.3,0.2]|[4.7,3.2,1.3,0.2]| 1|
|[4.6,3.1,1.5,0.2]|[4.6,3.1,1.5,0.2]| 1|
|[5.0,3.6,1.4,0.2]|[5.0,3.6,1.4,0.2]| 1|
|[5.4,3.9,1.7,0.4]|[5.4,3.9,1.7,0.4]| 1|
|[4.6,3.4,1.4,0.3]|[4.6,3.4,1.4,0.3]| 1|
|[5.0,3.4,1.5,0.2]|[5.0,3.4,1.5,0.2]| 1|
|[4.4,2.9,1.4,0.2]|[4.4,2.9,1.4,0.2]| 1|
|[4.9,3.1,1.5,0.1]|[4.9,3.1,1.5,0.1]| 1|
|[5.4,3.7,1.5,0.2]|[5.4,3.7,1.5,0.2]| 1|
|[4.8,3.4,1.6,0.2]|[4.8,3.4,1.6,0.2]| 1|
|[4.8,3.0,1.4,0.1]|[4.8,3.0,1.4,0.1]| 1|
|[4.3,3.0,1.1,0.1]|[4.3,3.0,1.1,0.1]| 1|
|[5.8,4.0,1.2,0.2]|[5.8,4.0,1.2,0.2]| 1|
|[5.7,4.4,1.5,0.4]|[5.7,4.4,1.5,0.4]| 1|
|[5.4,3.9,1.3,0.4]|[5.4,3.9,1.3,0.4]| 1|
|[5.1,3.5,1.4,0.3]|[5.1,3.5,1.4,0.3]| 1|
|[5.7,3.8,1.7,0.3]|[5.7,3.8,1.7,0.3]| 1|
|[5.1,3.8,1.5,0.3]|[5.1,3.8,1.5,0.3]| 1|
+-----------------+-----------------+-------+
only showing top 20 rows
```
\ No newline at end of file
此差异已折叠。
此差异已折叠。
# 14\. Social Network Analysis
Chinese proverb
**A Touch of Cloth,linked in countless ways.** – old Chinese proverb
![https://runawayhorse001.github.io/LearningApacheSpark/_images/net_work.png](img/cb0b50e4410efd78416163f37eaf1262.jpg)
## 14.1\. Introduction
<iframe width="560" height="315" src="https://www.youtube.com/embed/xT3EpF2EsbQ" frameborder="0" allowfullscreen=""></iframe>
## 14.2\. Co-occurrence Network
[Co-occurrence networks](https://en.wikipedia.org/wiki/Co-occurrence_networks) are generally used to provide a graphic visualization of potential relationships between people, organizations, concepts or other entities represented within written material. The generation and visualization of co-occurrence networks has become practical with the advent of electronically stored text amenable to text mining.
### 14.2.1\. Methodology
* Build Corpus C
* Build Document-Term matrix D based on Corpus C
* Compute Term-Document matrix ![D^T](img/315be0f70cd0effa6c8682f2a949a46c.jpg)
* Adjacency Matrix ![A =D^T\cdot D](img/e97f8315ce721d1417bc7bb3b4a9d332.jpg)
There are four main components in this algorithm in the algorithm: Corpus C, Document-Term matrix D, Term-Document matrix ![D^T](img/315be0f70cd0effa6c8682f2a949a46c.jpg) and Adjacency Matrix A. In this demo part, I will show how to build those four main components.
Given that we have three groups of friends, they are
> ```
> +-------------------------------------+
> |words |
> +-------------------------------------+
> |[[george] [jimmy] [john] [peter]] |
> |[[vincent] [george] [stefan] [james]]|
> |[[emma] [james] [olivia] [george]] |
> +-------------------------------------+
>
> ```
1. Corpus C
Then we can build the following corpus based on the unique elements in the given group data:
> ```
> [u'george', u'james', u'jimmy', u'peter', u'stefan', u'vincent', u'olivia', u'john', u'emma']
>
> ```
The corresponding elements frequency:
> ![https://runawayhorse001.github.io/LearningApacheSpark/_images/demo_freq.png](img/cdcdbf84e640274f429780824ccf99ae.jpg)
1. Document-Term matrix D based on Corpus C (CountVectorizer)
> ```
> from pyspark.ml.feature import CountVectorizer
> count_vectorizer_wo = CountVectorizer(inputCol='term', outputCol='features')
> # with total unique vocabulary
> countVectorizer_mod_wo = count_vectorizer_wo.fit(df)
> countVectorizer_twitter_wo = countVectorizer_mod_wo.transform(df)
> # with truncated unique vocabulary (99%)
> count_vectorizer = CountVectorizer(vocabSize=48,inputCol='term',outputCol='features')
> countVectorizer_mod = count_vectorizer.fit(df)
> countVectorizer_twitter = countVectorizer_mod.transform(df)
>
> ```
>
> ```
> +-------------------------------+
> |features |
> +-------------------------------+
> |(9,[0,2,3,7],[1.0,1.0,1.0,1.0])|
> |(9,[0,1,4,5],[1.0,1.0,1.0,1.0])|
> |(9,[0,1,6,8],[1.0,1.0,1.0,1.0])|
> +-------------------------------+
>
> ```
* Term-Document matrix ![D^T](img/315be0f70cd0effa6c8682f2a949a46c.jpg)
> RDD:
>
> ```
> [array([ 1., 1., 1.]), array([ 0., 1., 1.]), array([ 1., 0., 0.]),
> array([ 1., 0., 0.]), array([ 0., 1., 0.]), array([ 0., 1., 0.]),
> array([ 0., 0., 1.]), array([ 1., 0., 0.]), array([ 0., 0., 1.])]
>
> ```
>
> Matrix:
>
> ```
> array([[ 1., 1., 1.],
> [ 0., 1., 1.],
> [ 1., 0., 0.],
> [ 1., 0., 0.],
> [ 0., 1., 0.],
> [ 0., 1., 0.],
> [ 0., 0., 1.],
> [ 1., 0., 0.],
> [ 0., 0., 1.]])
>
> ```
1. Adjacency Matrix ![A =D^T\cdot D](img/e97f8315ce721d1417bc7bb3b4a9d332.jpg)
> RDD:
>
> ```
> [array([ 1., 1., 1.]), array([ 0., 1., 1.]), array([ 1., 0., 0.]),
> array([ 1., 0., 0.]), array([ 0., 1., 0.]), array([ 0., 1., 0.]),
> array([ 0., 0., 1.]), array([ 1., 0., 0.]), array([ 0., 0., 1.])]
>
> ```
>
> Matrix:
>
> ```
> array([[ 3., 2., 1., 1., 1., 1., 1., 1., 1.],
> [ 2., 2., 0., 0., 1., 1., 1., 0., 1.],
> [ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
> [ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
> [ 1., 1., 0., 0., 1., 1., 0., 0., 0.],
> [ 1., 1., 0., 0., 1., 1., 0., 0., 0.],
> [ 1., 1., 0., 0., 0., 0., 1., 0., 1.],
> [ 1., 0., 1., 1., 0., 0., 0., 1., 0.],
> [ 1., 1., 0., 0., 0., 0., 1., 0., 1.]])
>
> ```
### 14.2.2\. Coding Puzzle from my interview
* Problem
The attached utf-8 encoded text file contains the tags associated with an online biomedical scientific article formatted as follows (size: 100000). Each Scientific article is represented by a line in the file delimited by carriage return.
> ```
> +--------------------+
> | words|
> +--------------------+
> |[ACTH Syndrome, E...|
> |[Antibody Formati...|
> |[Adaptation, Phys...|
> |[Aerosol Propella...|
> +--------------------+
> only showing top 4 rows
>
> ```
Write a program that, using this file as input, produces a list of pairs of tags which appear TOGETHER in any order and position in at least fifty different Scientific articles. For example, in the above sample, [Female] and [Humans] appear together twice, but every other pair appears only once. Your program should output the pair list to stdout in the same form as the input (eg tag 1, tag 2n).
* My solution
> The corresponding words frequency:
>
> &gt; ![https://runawayhorse001.github.io/LearningApacheSpark/_images/freq_word_ze.png](img/f5832d90e75d18f501ede7acb0b6ce74.jpg)
> &gt;
> &gt; Word frequency
>
> Output:
>
> ```
> +----------+------+-------+
> | term.x|term.y| freq|
> +----------+------+-------+
> | Female|Humans|16741.0|
> | Male|Humans|13883.0|
> | Adult|Humans|10391.0|
> | Male|Female| 9806.0|
> |MiddleAged|Humans| 8181.0|
> | Adult|Female| 7411.0|
> | Adult| Male| 7240.0|
> |MiddleAged| Male| 6328.0|
> |MiddleAged|Female| 6002.0|
> |MiddleAged| Adult| 5944.0|
> +----------+------+-------+
> only showing top 10 rows
>
> ```
The corresponding Co-occurrence network:
> ![https://runawayhorse001.github.io/LearningApacheSpark/_images/netfreq.png](img/8c3fdcf6adcc472c7cd7a4598f96caac.jpg)
>
> Co-occurrence network
Then you will get Figure [Co-occurrence network](#fig-netfreq)
## 14.3\. Appendix: matrix multiplication in PySpark
1. load test matrix
```
df = spark.read.csv("matrix1.txt",sep=",",inferSchema=True)
df.show()
```
```
+---+---+---+---+
|_c0|_c1|_c2|_c3|
+---+---+---+---+
|1.2|3.4|2.3|1.1|
|2.3|1.1|1.5|2.2|
|3.3|1.8|4.5|3.3|
|5.3|2.2|4.5|4.4|
|9.3|8.1|0.3|5.5|
|4.5|4.3|2.1|6.6|
+---+---+---+---+
```
1. main function for matrix multiplication in PySpark
```
from pyspark.sql import functions as F
from functools import reduce
# reference: https://stackoverflow.com/questions/44348527/matrix-multiplication-at-a-in-pyspark
# do the sum of the multiplication that we want, and get
# one data frame for each column
colDFs = []
for c2 in df.columns:
colDFs.append( df.select( [ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ] ) )
# now union those separate data frames to build the "matrix"
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs )
mtxDF.show()
```
```
+------------------+------------------+------------------+------------------+
| op_0| op_1| op_2| op_3|
+------------------+------------------+------------------+------------------+
| 152.45|118.88999999999999| 57.15|121.44000000000001|
|118.88999999999999|104.94999999999999| 38.93| 94.71|
| 57.15| 38.93|52.540000000000006| 55.99|
|121.44000000000001| 94.71| 55.99|110.10999999999999|
+------------------+------------------+------------------+------------------+
```
1. Validation with python version
```
import numpy as np
a = np.genfromtxt("matrix1.txt",delimiter=",")
np.dot(a.T, a)
```
```
array([[152.45, 118.89, 57.15, 121.44],
[118.89, 104.95, 38.93, 94.71],
[ 57.15, 38.93, 52.54, 55.99],
[121.44, 94.71, 55.99, 110.11]])
```
## 14.4\. Correlation Network
TODO ..
\ No newline at end of file
# 15\. ALS: Stock Portfolio Recommendations
Chinese proverb
**Don’t put all your eggs in one basket.**
![https://runawayhorse001.github.io/LearningApacheSpark/_images/stock_portfolio.png](img/b4a297ef2185e28694b366bde4069858.jpg)
Code for the above figure:
```
import numpy as np
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(10, 8), subplot_kw=dict(aspect="equal"))
recipe = ["375 k U.S. Large Cap Blend",
"300 k U.S. Large Cap Value",
"75 k U.S. Short-Term Bonds",
"50 k U.S. Small Cap Blend",
"55 k U.S. Small Cap Value",
"95 k U.S. Real Estate",
"250 k Intermediate-Term Bonds"]
data = [float(x.split()[0]) for x in recipe]
ingredients = [' '.join(x.split()[2:]) for x in recipe]
print(data)
print(ingredients)
def func(pct, allvals):
absolute = int(pct/100.*np.sum(allvals))
return "{:.1f}%\n({:d} k)".format(pct, absolute)
explode = np.empty(len(data))#(0.1, 0.1, 0.1, 0.1, 0.1, 0.1) # explode 1st slice
explode.fill(0.1)
wedges, texts, autotexts = ax.pie(data, explode=explode, autopct=lambda pct: func(pct, data),
textprops=dict(color="w"))
ax.legend(wedges, ingredients,
#title="Stock portfolio",
loc="center left",
bbox_to_anchor=(1, 0, 0.5, 1))
plt.setp(autotexts, size=8, weight="bold")
#ax.set_title("Stock portfolio")
plt.show()
```
## 15.1\. Recommender systems
Recommender systems or recommendation systems (sometimes replacing “system” with a synonym such as platform or engine) are a subclass of information filtering system that seek to predict the “rating” or “preference” that a user would give to an item.”
The main idea is to build a matrix users `R` items rating values and try to factorize it, to recommend main products rated by other users. A popular approach for this is matrix factorization is Alternating Least Squares (ALS)
## 15.2\. Alternating Least Squares
Apache Spark ML implements ALS for collaborative filtering, a very popular algorithm for making recommendations.
ALS recommender is a matrix factorization algorithm that uses Alternating Least Squares with Weighted-Lamda-Regularization (ALS-WR). It factors the user to item matrix `A` into the user-to-feature matrix `U` and the item-to-feature matrix `M`: It runs the ALS algorithm in a parallel fashion. The ALS algorithm should uncover the latent factors that explain the observed user to item ratings and tries to find optimal factor weights to minimize the least squares between predicted and actual ratings.
[https://www.elenacuoco.com/2016/12/22/alternating-least-squares-als-spark-ml/](https://www.elenacuoco.com/2016/12/22/alternating-least-squares-als-spark-ml/)
## 15.3\. Demo
* The Jupyter notebook can be download from [ALS Recommender systems](_static/ALS.ipynb).
* The data can be downloaf from [German Credit](_static/OnlineRetail.csv).
### 15.3.1\. Load and clean data
1. Set up spark context and SparkSession
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark RFM example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
```
1. Load dataset
```
df_raw = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("Online Retail.csv",header=True);
```
check the data set
```
df_raw.show(5)
df_raw.printSchema()
```
Then you will get
```
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 5 rows
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)
```
1. Data clean and data manipulation
* check and remove the `null` values
```
from pyspark.sql.functions import count
def my_count(df_in):
df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()
```
```
import pyspark.sql.functions as F
from pyspark.sql.functions import round
df_raw = df_raw.withColumn('Asset',round( F.col('Quantity') * F.col('UnitPrice'), 2 ))
df = df_raw.withColumnRenamed('StockCode', 'Cusip')\
.select('CustomerID','Cusip','Quantity','UnitPrice','Asset')
```
```
my_count(df)
```
```
+----------+------+--------+---------+------+
|CustomerID| Cusip|Quantity|UnitPrice| Asset|
+----------+------+--------+---------+------+
| 406829|541909| 541909| 541909|541909|
+----------+------+--------+---------+------+
```
Since the count results are not the same, we have some null value in the `CustomerID` column. We can drop these records from the dataset.
```
df = df.filter(F.col('Asset')>=0)
df = df.dropna(how='any')
my_count(df)
```
```
+----------+------+--------+---------+------+
|CustomerID| Cusip|Quantity|UnitPrice| Asset|
+----------+------+--------+---------+------+
| 397924|397924| 397924| 397924|397924|
+----------+------+--------+---------+------+
```
```
df.show(3)
+----------+------+--------+---------+-----+
|CustomerID| Cusip|Quantity|UnitPrice|Asset|
+----------+------+--------+---------+-----+
| 17850|85123A| 6| 2.55| 15.3|
| 17850| 71053| 6| 3.39|20.34|
| 17850|84406B| 8| 2.75| 22.0|
+----------+------+--------+---------+-----+
only showing top 3 rows
```
* Convert the `Cusip` to consistent format
```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
def toUpper(s):
return s.upper()
upper_udf = udf(lambda x: toUpper(x), StringType())
```
* Find the most top `n` stockes
```
pop = df.groupBy('Cusip')\
.agg(F.count('CustomerID').alias('Customers'),F.round(F.sum('Asset'),2).alias('TotalAsset'))\
.sort([F.col('Customers'),F.col('TotalAsset')],ascending=[0,0])
pop.show(5)
```
```
+------+---------+----------+
| Cusip|Customers|TotalAsset|
+------+---------+----------+
|85123A| 2035| 100603.5|
| 22423| 1724| 142592.95|
|85099B| 1618| 85220.78|
| 84879| 1408| 56580.34|
| 47566| 1397| 68844.33|
+------+---------+----------+
only showing top 5 rows
```
### 15.3.2\. Build feature matrix
* Fetch the top `n` cusip list
```
top = 10
cusip_lst = pd.DataFrame(pop.select('Cusip').head(top)).astype('str').iloc[:, 0].tolist()
cusip_lst.insert(0,'CustomerID')
```
* Create the portfolio table for each customer
```
pivot_tab = df.groupBy('CustomerID').pivot('Cusip').sum('Asset')
pivot_tab = pivot_tab.fillna(0)
```
* Fetch the most `n` stock’s portfolio table for each customer
```
selected_tab = pivot_tab.select(cusip_lst)
selected_tab.show(4)
```
```
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
|CustomerID|85123A|22423|85099B|84879|47566|20725|22720|20727|POST|23203|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
| 16503| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 33.0| 0.0| 0.0|
| 15727| 123.9| 25.5| 0.0| 0.0| 0.0| 33.0| 99.0| 0.0| 0.0| 0.0|
| 14570| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 14450| 0.0| 0.0| 8.32| 0.0| 0.0| 0.0| 49.5| 0.0| 0.0| 0.0|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
only showing top 4 rows
```
* Build the `rating` matrix
```
def elemwiseDiv(df_in):
num = len(df_in.columns)
temp = df_in.rdd.map(lambda x: list(flatten([x[0],[x[i]/float(sum(x[1:]))
if sum(x[1:])>0 else x[i]
for i in range(1,num)]])))
return spark.createDataFrame(temp,df_in.columns)
ratings = elemwiseDiv(selected_tab)
```
```
ratings.show(4)
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
|CustomerID|85123A|22423|85099B|84879|47566|20725|22720|20727|POST|23203|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
| 16503| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|
| 15727| 0.44| 0.09| 0.0| 0.0| 0.0| 0.12| 0.35| 0.0| 0.0| 0.0|
| 14570| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 14450| 0.0| 0.0| 0.14| 0.0| 0.0| 0.0| 0.86| 0.0| 0.0| 0.0|
+----------+------+-----+------+-----+-----+-----+-----+-----+----+-----+
```
* Convert `rating` matrix to long table
```
from pyspark.sql.functions import array, col, explode, struct, lit
def to_long(df, by):
"""
reference: https://stackoverflow.com/questions/37864222/transpose-column-to-row-with-spark
"""
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("Cusip"), col(c).alias("rating")) for c in cols
])).alias("kvs")
```
```
df_all = to_long(ratings,['CustomerID'])
df_all.show(5)
```
```
+----------+------+------+
|CustomerID| Cusip|rating|
+----------+------+------+
| 16503|85123A| 0.0|
| 16503| 22423| 0.0|
| 16503|85099B| 0.0|
| 16503| 84879| 0.0|
| 16503| 47566| 0.0|
+----------+------+------+
only showing top 5 rows
```
* Convert the string `Cusip` to numerical index
```
from pyspark.ml.feature import StringIndexer
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='Cusip',
outputCol='indexedCusip').fit(df_all)
df_all = labelIndexer.transform(df_all)
df_all.show(5, True)
df_all.printSchema()
```
```
+----------+------+------+------------+
|CustomerID| Cusip|rating|indexedCusip|
+----------+------+------+------------+
| 16503|85123A| 0.0| 6.0|
| 16503| 22423| 0.0| 9.0|
| 16503|85099B| 0.0| 5.0|
| 16503| 84879| 0.0| 1.0|
| 16503| 47566| 0.0| 0.0|
+----------+------+------+------------+
only showing top 5 rows
root
|-- CustomerID: long (nullable = true)
|-- Cusip: string (nullable = false)
|-- rating: double (nullable = true)
|-- indexedCusip: double (nullable = true)
```
### 15.3.3\. Train model
* build `train` and `test` dataset
```
train, test = df_all.randomSplit([0.8,0.2])
train.show(5)
test.show(5)
```
```
+----------+-----+------------+-------------------+
|CustomerID|Cusip|indexedCusip| rating|
+----------+-----+------------+-------------------+
| 12940|20725| 2.0| 0.0|
| 12940|20727| 4.0| 0.0|
| 12940|22423| 9.0|0.49990198000392083|
| 12940|22720| 3.0| 0.0|
| 12940|23203| 7.0| 0.0|
+----------+-----+------------+-------------------+
only showing top 5 rows
+----------+-----+------------+------------------+
|CustomerID|Cusip|indexedCusip| rating|
+----------+-----+------------+------------------+
| 12940|84879| 1.0|0.1325230346990786|
| 13285|20725| 2.0|0.2054154995331466|
| 13285|20727| 4.0|0.2054154995331466|
| 13285|47566| 0.0| 0.0|
| 13623|23203| 7.0| 0.0|
+----------+-----+------------+------------------+
only showing top 5 rows
```
* train model
```
import itertools
from math import sqrt
from operator import add
import sys
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
def computeRmse(model, data):
"""
Compute RMSE (Root mean Squared Error).
"""
predictions = model.transform(data)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
return rmse
#train models and evaluate them on the validation set
ranks = [4,5]
lambdas = [0.05]
numIters = [30]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
val = test.na.drop()
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
als = ALS(rank=rank, maxIter=numIter, regParam=lmbda, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False,
alpha=1.0,
userCol="CustomerID", itemCol="indexedCusip", seed=1, ratingCol="rating", nonnegative=True)
model=als.fit(train)
validationRmse = computeRmse(model, val)
print("RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter))
if (validationRmse, bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
model = bestModel
```
### 15.3.4\. Make prediction
* make prediction
```
topredict=test[test['rating']==0]
predictions=model.transform(topredict)
predictions.filter(predictions.prediction>0)\
.sort([F.col('CustomerID'),F.col('Cusip')],ascending=[0,0]).show(5)
```
```
+----------+------+------------+------+------------+
|CustomerID| Cusip|indexedCusip|rating| prediction|
+----------+------+------------+------+------------+
| 18283| 47566| 0.0| 0.0| 0.01625076|
| 18282|85123A| 6.0| 0.0| 0.057172246|
| 18282| 84879| 1.0| 0.0| 0.059531752|
| 18282| 23203| 7.0| 0.0| 0.010502596|
| 18282| 22720| 3.0| 0.0| 0.053893942|
+----------+------+------------+------+------------+
only showing top 5 rows
```
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# 2\. Why Spark with Python ?
Chinese proverb
**Sharpening the knife longer can make it easier to hack the firewood** – old Chinese proverb
I want to answer this question from the following two parts:
## 2.1\. Why Spark?
I think the following four main reasons from [Apache Spark™](http://spark.apache.org/) official website are good enough to convince you to use Spark.
1. Speed
Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
Apache Spark has an advanced DAG execution engine that supports acyclic data flow and in-memory computing.
> ![https://runawayhorse001.github.io/LearningApacheSpark/_images/logistic-regression.png](img/72748fa31cb48a5062a2fc7949bd0b45.jpg)
>
> Logistic regression in Hadoop and Spark
1. Ease of Use
Write applications quickly in Java, Scala, Python, R.
Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala, Python and R shells.
2. Generality
Combine SQL, streaming, and complex analytics.
Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.
> [![https://runawayhorse001.github.io/LearningApacheSpark/_images/stack.png](img/d3b112475692c0421480c01cd029cf09.jpg)](https://runawayhorse001.github.io/LearningApacheSpark/_images/stack.png)
>
> The Spark stack
1. Runs Everywhere
Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.
> [![https://runawayhorse001.github.io/LearningApacheSpark/_images/spark-runs-everywhere.png](img/b9eb842264e6a48a42ecf5f142e32414.jpg)](https://runawayhorse001.github.io/LearningApacheSpark/_images/spark-runs-everywhere.png)
>
> The Spark platform
## 2.2\. Why Spark with Python (PySpark)?
No matter you like it or not, Python has been one of the most popular programming languages.
> ![https://runawayhorse001.github.io/LearningApacheSpark/_images/languages.jpg](img/348c0d7bc8db0d630042e5faffd2d647.jpg)
>
> KDnuggets Analytics/Data Science 2017 Software Poll from [kdnuggets](http://www.kdnuggets.com/2017/05/poll-analytics-data-science-machine-learning-software-leaders.html).
\ No newline at end of file
# 20\. My Cheat Sheet
You can download the PDF version: [PySpark Cheat Sheet](https://github.com/runawayhorse001/CheatSheet/blob/master/cheatSheet_pyspark.pdf) and [pdDataFrame vs rddDataFrame](https://github.com/runawayhorse001/CheatSheet/blob/master/cheatSheet_pdrdd.pdf).
![https://runawayhorse001.github.io/LearningApacheSpark/_images/cheatSheet_pyspark1.jpg](img/c24065c33e1cca422d1ae92f57cd77c1.jpg) ![https://runawayhorse001.github.io/LearningApacheSpark/_images/cheatSheet_pyspark2.jpg](img/88d05071bd3700af0ba08bab16c423be.jpg) ![https://runawayhorse001.github.io/LearningApacheSpark/_images/cheatSheet_pdrdd.jpg](img/dd0fad3141f468ebc29678d3ff86055d.jpg)
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册