注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

mmicky 的博客

追逐刹那的惊艳

 
 
 

日志

 
 

Spark1.0.0 多语言编程之python实现  

2014-04-14 22:04:31|  分类: spark |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
Spark公开了pyhton的编程模型-PySpark,开发者通过PySpark可以很容易开发Spark application。
但是Python API和Scala API略有不同:
  • Python是动态语言,RDD可以持有不同类型的对象
  • PySpark目前并没有支持全部的API,但核心部分已经全部支持
在PySpark里,RDD支持scala一样的方法,只不过这些方法是Python函数来实现的,返回的也是Python的集合类型;对于RDD方法中使用的短函数可以使用Python的lambda语法实现。
不过python开发Spark Application拥有很多优势:
  • 不需要编译,使用方便
  • 可以与许多系统集成,特别是NoSQL大部分都提供了python开发包

1:开发环境搭建
在使用PySpark开发spark application前,要先搭建好python开发环境:
  • 安装2.6及以上的Python,强烈建议2.7,注意别使用Python3
  • 在环境变量中做如下设置export PYTHONPATH=$SPARK_HOME/python
  • spark提供了$SPARK_HOME/bin/pyspark,通过该shell,开发者也可以做开发和调试应用程序。
  • 启动伪分布式Hadoop2.2.0集群和Spark 1.0.0

2:sogou日志数据分析python实现
sogou日志需求详见Spark1.0.0 多语言编程
A:用户在00:00:00到12:00:00之间的查询数
hadoop@wyy:/app/hadoop/spark100$ vi sogouA.py
hadoop@wyy:/app/hadoop/spark100$ cat sogouA.py

from pyspark import SparkContext

sc = SparkContext("spark://wyy:7077", "sogouA")
sgRDD = sc.textFile("hdfs://wyy:8000/test/minisogou/mini/mini.txt")
print sgRDD.filter(lambda line : line.split('\t')[0] >= '00:00:00' and line.split('\t')[0] <= '12:00:00').count()

hadoop@wyy:/app/hadoop/spark100$ python sogouA.py
运行结果:527300

B:搜索结果排名第1,但是点击次序排在第2的数据有多少?
hadoop@wyy:/app/hadoop/spark100$ vi sogouA.py
hadoop@wyy:/app/hadoop/spark100$ cat sogouA.py

from pyspark import SparkContext

sc = SparkContext("spark://wyy:7077", "sogouB")
sgRDD = sc.textFile("hdfs://wyy:8000/test/minisogou/mini/mini.txt")
print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : line.split('\t')[3]).filter(lambda line :
int(line.split(' ')[0])==1 and int(line.split(' ')[1])==2).count()

hadoop@wyy:/app/hadoop/spark100$ python sogouA.py
运行结果:79765

C:一个session内查询次数最多的用户的session与相应的查询次数
hadoop@wyy:/app/hadoop/spark100$ vi sogouA.py
hadoop@wyy:/app/hadoop/spark100$ cat sogouA.py

from pyspark import SparkContext

sc = SparkContext("spark://wyy:7077", "sogouC")
sgRDD = sc.textFile("hdfs://wyy:8000/test/minisogou/mini/mini.txt")
print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)

hadoop@wyy:/app/hadoop/spark100$ python sogouA.py
运行结果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

  评论这张
 
阅读(1768)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017