PageRank in Apache Spark

This is one of the basic examples how Apache Spark works and how it looks like.

In [1]:
link_data = {
    0: [1, 2],
    1: [2, 6],
    2: [1, 0],
    3: [1, 0],
    4: [1],
    5: [0, 1],
    6: [0, 7],
    7: [0, 1, 2, 3, 9],
    8: [5, 9],
    9: [7]
}
In [2]:
import networkx as nx
link_graph = nx.DiGraph(link_data)
labels = dict((n, str(n)) for n in link_graph.nodes())
nx.draw_circular(link_graph, labels = labels)
In [3]:
ranks = sc.range(len(link_data)).map(lambda x : (x, 1.))
In [4]:
links = sc.parallelize(link_data.items()).cache()
In [5]:
links.join(ranks).collect()
Out[5]:
[(0, ([1, 2], 1.0)),
 (8, ([5, 9], 1.0)),
 (1, ([2, 6], 1.0)),
 (9, ([7], 1.0)),
 (2, ([1, 0], 1.0)),
 (3, ([1, 0], 1.0)),
 (4, ([1], 1.0)),
 (5, ([0, 1], 1.0)),
 (6, ([0, 7], 1.0)),
 (7, ([0, 1, 2, 3, 9], 1.0))]
In [6]:
def computeContribs(node_urls_rank):
    _, (urls, rank) = node_urls_rank
    nb_urls = len(urls)
    for url in urls:
        yield url, rank / nb_urls

This takes a while to execute. Do

tail -f ~/.smc/jupyter/jupyter-notebook.*

in a Terminal to see what's going on behind the scenes!

In [7]:
from operator import add

for iteration in range(10):
    # compute contributions of each node where it links to
    contribs = links.join(ranks).flatMap(computeContribs)

    # use a full outer join to make sure, that not well connected nodes aren't dropped
    contribs = links.fullOuterJoin(contribs).mapValues(lambda x : x[1] or 0.0)

    # Sum up all contributions per link
    ranks = contribs.reduceByKey(add)

    # Re-calculate URL ranks
    ranks = ranks.mapValues(lambda rank: rank * 0.85 + 0.15)

# Collects all URL ranks
for (link, rank) in sorted(ranks.collect()):
    print("%s has rank: %s." % (link, rank / len(link_data)))
0 has rank: 0.19797667891987486.
1 has rank: 0.24446656065777836.
2 has rank: 0.2198061047952103.
3 has rank: 0.031639754843834245.
4 has rank: 0.015.
5 has rank: 0.021374999999999998.
6 has rank: 0.11889764243922467.
7 has rank: 0.09782350350024308.
8 has rank: 0.015.
9 has rank: 0.03801475484383424.

Comparison with NetworkX

In [8]:
import networkx as nx
g = nx.DiGraph(link_data)
nx.pagerank(g)
Out[8]:
{0: 0.19806899390752195,
 1: 0.24446742505051136,
 2: 0.21971157320786078,
 3: 0.031632549471844296,
 4: 0.015000000000000003,
 5: 0.021375000000000005,
 6: 0.11889867401777406,
 7: 0.0978382348726434,
 8: 0.015000000000000003,
 9: 0.038007549471844294}
In [ ]: