© 2016 - Harald Schilly <[email protected]> - CC BY-SA 4.0
~/.zshrc
:
export SPARK_HOME="/opt/spark/current/"
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH="$PATH":$SPARK_HOME/bin
export PYSPARK_PYTHON=/opt/anaconda/bin/python
$SPARK_HOME/conf/spark-env.sh
:
PYSPARK_DRIVER_PYTHON=ipython
PYSPARK_DRIVER_PYTHON_OPTS=notebook
Start using: $ pyspark
import sys
import os
import re
import numpy as np
import pandas as pd
from glob import glob
import operator
print(sys.version)
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn; seaborn.set() # "talk")
sc, sqlContext
"Apache Spark Version %s" % sc.version
xx = sc.range(-200, 300)
xx
values_rdd = xx.map(lambda x : 2 * (x / 100)**2 - 10)
print(values_rdd.toDebugString().decode("utf8"))
yy = values_rdd.collect()
plt.plot(xx.collect(), yy)
def get_words(f):
x = f.zipWithIndex().filter(lambda x :
"Ende dieses Projekt Gutenberg Etextes" in x[0] or
"End of the Project Gutenberg EBook of Faust" in x[0]).first()[1]
length = sc.broadcast(x)
lines = f.zipWithIndex().filter(lambda x : x[1] > 40 and x[1] < length.value)
return lines.flatMap(lambda x : x[0].split())
words = sc.emptyRDD()
for fn in glob("*.txt"):
f = sc.textFile(fn, minPartitions=10)
words = words.union(get_words(f))
print(words.toDebugString().decode("utf8"))
words.count()
# rough check, with header/footer:
! cat *.txt | wc -w
chars = sc.accumulator(0)
words.foreach(lambda w : chars.add(len(w)))
chars.value
# rough check, with header/footer:
! cat *.txt | wc -c
Alle nicht-Buchstaben herausfiltern.
import re
pat = re.compile(r'[^a-zA-ZöäüÖÄÜß ]+', re.IGNORECASE)
wordtuples = words.map(lambda w : (pat.sub('', w), 1))
wordcounts = wordtuples.reduceByKey(lambda c1, c2 : c1 + c2)
wordcounts.takeOrdered(30, key = lambda x : -x[1])
! cat *.txt | grep -c "MEPHISTOPHELES"
Kontrolle: Summe aller Wortlängen
from operator import add
wordcounts.map(lambda x : x[1]).reduce(add)
words.map(lambda w : (len(w), w)).top(10)
x = 1000
words.cache()
words_indexed = words.zipWithIndex().cache()
for i in range(20):
d = 1 / (2 + i)
v = words_indexed\
.filter(lambda e : e[1] < x)\
.map(lambda w : len(w[0])).reduce(operator.add)
if v > 3000:
x -= d * x
else:
x += d * x
print("x[{:2d}] = {:9.2f} → v = {:d} (d = {:.2f})".format(i, x, v, d))
link_data = [
(0, 1),
(0, 2),
(0, 3),
(1, 3),
(1, 3),
(2, 3),
(3, 0),
(4, 0),
(2, 3),
(4, 1)
]
links = sc.parallelize(link_data).distinct().groupByKey().cache()
# init rank data
ranks = links.map(lambda l: (l[0], 1.0))
from operator import add
def computeContribs(urls, rank):
num_urls = len(urls)
for url in urls:
yield url, rank / num_urls
for iteration in range(10):
# Calculates contribution of link to neighbour
contribs = links.join(ranks)\
.flatMap(lambda url_urls_rank: computeContribs(*url_urls_rank[1]))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in sorted(ranks.collect()):
print("%s has rank: %s." % (link, rank))
import string
U = string.ascii_uppercase
chars = words.flatMap(lambda x : list(x)) \
.map(lambda c : c.upper()) \
.filter(lambda c : c in string.ascii_uppercase)
chars.cache()
print(chars.collect()[:100])
char_freq = chars.countByKey()
char_freq
fig,ax = plt.subplots()
_ = ax.bar(range(len(U)), [char_freq[_] for _ in U])
_ = ax.set_xticks(np.arange(len(U)) + .35)
_ = ax.set_xticklabels(U)
plt.plot()
Zufällige Daten generieren …
data = np.c_[
sorted(2 * np.random.rand(40)),
np.r_[np.random.randn(20) + np.linspace(0, 3, 20),
np.random.randn(20) + np.linspace(0, 2, 20) + 3]
]
plt.scatter(data[:,0], data[:,1])
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
Datenpunkte und dazugehöriger Featurevektor (exogen x → endogen y)
sparkdata = sc.parallelize(LabeledPoint(y, [x]) for x, y in data)
lm = LinearRegressionWithSGD.train(sparkdata)
lm
valuesAndPreds = sparkdata.map(lambda p: (p.label, lm.predict(p.features)))
MSE = valuesAndPreds.map(lambda v_p: (v_p[0] - v_p[1])**2).sum() / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
xx = np.linspace(-1, 3, 100)
yy = [lm.predict([_]) for _ in xx]
fig, ax = plt.subplots()
_ = ax.scatter(data[:,0], data[:,1])
_ = ax.plot(xx, yy, color="green")
plt.show()
from pyspark.mllib.regression import IsotonicRegression
sparkiso = sc.parallelize((y, x, 1) for x, y in data)
ir = IsotonicRegression.train(sparkiso)
valuesAndPreds2 = sparkdata.map(lambda p: (p.label, ir.predict(p.features)))
MSE = valuesAndPreds2.map(lambda v_p: (v_p[0] - v_p[1])**2).sum() / valuesAndPreds2.count()
print("Mean Squared Error = " + str(MSE))
yy2 = [ir.predict([_]) for _ in xx]
fig, ax = plt.subplots()
_ = ax.scatter(data[:,0], data[:,1])
_ = ax.plot(xx, yy2, color="green")
plt.show()
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
entries = sc.textFile("elements.csv").map(lambda line : line.split(","))
print(', '.join(entries.first()))
Converting second to last row into "Row"s
def try_float(x):
try:
return float(x)
except:
np.nan
elements = entries.zipWithIndex()\
.filter(lambda x : x[1] >= 1).map(lambda x : x[0]) \
.map(lambda e: Row(
number = int(e[-3]),
symbol=e[11],
name=e[16],
mass=try_float(e[19]),
radius = try_float(e[-1])
))
Registering Schema
elementsDF = sqlContext.createDataFrame(elements)
elementsDF.registerTempTable("elements")
Inferring types
elementsDF.printSchema()
Query DataFrame with SQL
heavy = sqlContext.sql("SELECT number, name, mass FROM elements WHERE mass >= 250 SORT BY number")
heavy.collect()
radii = sqlContext.sql("SELECT radius FROM elements SORT BY number")
plt.plot(radii.collect())
Query DataFrame
elementsDF.show()
df = elementsDF.select('number', 'name', 'radius', 'mass')\
.filter(elementsDF.mass < 20)\
.sort('number')
df.show()
df.fillna(0).show()
Future: DataSets