Hive中自定义Map/Reduce示例 In Python

Hive支持自定义map与reduce script。接下来我用一个简单的wordcount例子加以说明。使用Python开发(如果使用Java开发,请看这里)。

1
2
3
4
开发环境:
python:2.7.5
hive:2.3.0
hadoop:2.8.1

###一、map与reduce脚本
map脚本(mapper.py)

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/python
import sys
import re
while True:
line = sys.stdin.readline().strip()
if not line:
break
p = re.compile(r'\W+')
words=p.split(line)
#write the tuples to stdout
for word in words:
print '%s\t%s' % (word, "1")

reduce脚本(reducer.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/python
import sys

# maps words to their counts
word2count = {}

while True:
line=sys.stdin.readline().strip()
if not line:
break
# parse the input we got from mapper.py
try:
word,count= line.split('\t', 1)
except:
continue

# convert count (currently a string) to int
try:
count = int(filter(str.isdigit,count))
except ValueError:
continue

try:
word2count[word] = word2count[word]+count
except:
word2count[word] = count

# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
print '%s\t%s' % ( word, word2count[word] )

注意一点的是,不能使用for line in std.in,因为for是一个字节一个字节的读取,而不是一行一行地读。而且在对map输出的word,count进行拆分时,要注意将拆分的count部分非数字部分去掉,以免count转换成int错误。

###二、编写hive hql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
drop table if exists raw_lines;

-- create table raw_line, and read all the lines in '/user/inputs', this is the path on your local HDFS
create external table if not exists raw_lines(line string)
ROW FORMAT DELIMITED
stored as textfile
location '/user/inputs';

drop table if exists word_count;

-- create table word_count, this is the output table which will be put in '/user/outputs' as a text file, this is the path on your local HDFS

create external table if not exists word_count(word string, count int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
lines terminated by '\n' STORED AS TEXTFILE LOCATION '/user/outputs/';

-- add the mapper&reducer scripts as resources, please change your/local/path
add file /home/yanggy/mapper.py;
add file /home/yanggy/reducer.py;

from (
from raw_lines
map raw_lines.line
--call the mapper here
using 'mapper.py'
as word, count
cluster by word) map_output
insert overwrite table word_count
reduce map_output.word, map_output.count
--call the reducer here
using 'reducer.py'
as word,count;