2013年12月15日星期日

使用python构建基于hadoop的mapreduce日志分析平台

本邮件内容由第三方提供,如果您不想继续收到该邮件,可 点此退订
使用python构建基于hadoop的mapreduce日志分析平台  阅读原文»

使用python构建基于hadoop的mapreduce日志分析平台

224356461.jpg

流量比较大的日志要是直接写入Hadoop对Namenode负载过大,所以入库前合并,可以把各个节点的日志凑并成一个文件写入HDFS。 根据情况定期合成,写入到hdfs里面。

咱们看看日志的大小,200G的dns日志文件,我压缩到了18G,要是用awk perl当然也可以,但是处理速度肯定没有分布式那样的给力。

230102727.jpg

Hadoop Streaming原理

mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。

任何语言,只要是方便接收标准输入输出就可以做mapreduce~

再搞之前我们先简单测试下shell模拟mapreduce的性能速度~

234955396.jpg

看下他的结果,350M的文件用时35秒左右。

235045406.jpg

这是2G的日志文件,居然用了3分钟。 当然和我写的脚本也有问题,我们是模拟mapreduce的方式,而不是调用shell下牛逼的awk,gawk处理。

001056805.jpg

awk的速度 !果然很霸道,处理日志的时候,我也很喜欢用awk,只是学习的难度有点大,不像别的shell组件那么灵活简单。

000946258.jpg

这是官方的提供的两个demo ~

map.py

  #!/usr/bin/env python  """A more advanced Mapper, using Python iterators and generators."""  import sys  def read_input(file):      for line in file:          # split the line into words          yield line.split()  def main(separator='\t'):      # input comes from STDIN (standard input)      data = read_input(sys.stdin)      for words in data:          # write the results to STDOUT (standard output);          # what we output here will be the input for the          # Reduce step, i.e. the input for reducer.py          #          # tab-delimited; the trivial word count is 1          for word in words:              print '%s%s%d' % (word, separator, 1)  if __name__ == "__main__":      main()  

reduce.py的修改方式

  #!/usr/bin/env python  """A more advanced Reducer, using Python iterators and generators."""  from itertools import groupby  from operator import itemgetter  import sys  def read_mapper_output(file, separator='\t'):      for line in file:          yield line.rstrip().split(separator, 1)  def main(separator='\t'):      # input comes from STDIN (standard input)      data = read_mapper_output(sys.stdin, separator=separator)      # groupby groups multiple word-count pairs by word,      # and creates an iterator that returns consecutive keys and their group:      #   current_word - string containing a word (the key)      #   group - iterator yielding all ["<current_word>", "<count>"] items      for current_word, group in groupby(data, itemgetter(0)):          try:              total_count = sum(int(count) for current_word, count in group)              print "%s%s%d" % (current_word, separator, total_count)          except ValueError:              # count was not a number, so silently discard this item              pass  if __name__ == "__main__":      main()  

咱们再简单点:

  #!/usr/bin/env python  import sys  for line in sys.stdin:      line = line.strip()      words = line.split()      for word in words:          print '%s\t%s' % (word, 1)  
  #!/usr/bin/env python  from operator import itemgetter  import sys  current_word = None  current_count = 0  word = None  for line in sys.stdin:      line = line.strip()      word, count = line.split('\t', 1)      try:          count = int(count)      except ValueError:          continue      if current_word == word:          current_count += count      else:          if current_word:              print '%s\t%s' % (current_word, current_count)          current_count = count          current_word = word  if current_word == word:      print '%s\t%s' % (current_word, current_count)  

咱们就简单模拟下数据,跑个测试

084336884.jpg

剩下就没啥了,在hadoop集群环境下,运行hadoop的steaming.jar组件,加入mapreduce的脚本,指定输出就行了. 下面的例子我用的是shell的成分。

  [root@101 cron]#$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \  -input myInputDirs \  -output myOutputDir \  -mapper cat \  -reducer wc  

详细的参数,对于咱们来说提供性能可以把tasks的任务数增加下,根据情况自己测试下,也别太高了,增加负担。

(1)-input:输入文件路径

(2)-output:输出文件路径

(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

(6)-partitioner:用户自定义的partitioner程序

(7)-combiner:用户自定义的combiner程序(必须用java实现)

(8)-D:作业的一些属性(以前用的是-jonconf),具体有:
1)mapred.map.tasks:map task数目
2)mapred.reduce.tasks:reduce task数目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目

这里是统计dns的日志文件有多少行 ~

114512821.jpg

在mapreduce作为参数的时候,不能用太多太复杂的shell语言,他不懂的~

可以写成shell文件的模式;

  #! /bin

阅读更多内容

没有评论:

发表评论