MapReduce代码本地可以跑通:

但是上Hadoop集群就报错:

这是我的map.py、reduce.py和run.sh
import os
import sys
import re
def get_white_list_word(white_list_dir):
white_list_word = set()
if os.path.isdir(white_list_dir):
for cachefile in os.listdir(white_list_dir):
with open(white_list_dir + '/' + cachefile) as cachefile:
for word in cachefile:
word = word.strip()
white_list_word.add(word)
return white_list_word
def mapper_func(white_list_dir):
white_list_word = get_white_list_word(white_list_dir)
for line in sys.stdin:
word_list = line.strip().split(' ')
for word in word_list:
if len(re.findall(r'\w+', word)) < 1:
continue
word = re.findall(r'\w+', word)[0].lower()
if word in white_list_word:
print('\t'.join([word, '1']))
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
import sys
def reducer_func():
cur_word = None
cur_cnt = 0
for line in sys.stdin:
word, cnt = line.strip().split('\t')
if cur_word == None:
cur_word = word
if cur_word != word:
print('\t'.join([cur_word, str(cur_cnt)]))
cur_word = word
cur_cnt = 0
cur_cnt += int(cnt)
print('\t'.join([cur_word, str(cur_cnt)]))
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
INPUT_PATH="/input_mr_cachearchive_broadcast/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_cachearchive_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_PATH \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func WLD" \
-reducer "python reduce.py reducer_func" \
-jobconf "mapred.reduce.tasks=2" \
-jobconf "mapred.job.name=mr_cachearchive_broadcast" \
-jobconf "stream.non.zero.exit.is.failure=false" \
-cacheArchive "hdfs://master:9000/input_mr_cachearchive_broadcast/white_list_dir.tgz#WLD" \
-file "./map.py" \
-file "./reduce.py"