{"id":1291,"date":"2016-08-27T15:55:04","date_gmt":"2016-08-27T15:55:04","guid":{"rendered":"http:\/\/62.131.51.129\/?p=1291"},"modified":"2016-08-27T15:55:04","modified_gmt":"2016-08-27T15:55:04","slug":"python-in-a-map-reduce-environment","status":"publish","type":"post","link":"http:\/\/archief.van-maanen.com\/?p=1291","title":{"rendered":"Python in a map reduce environment"},"content":{"rendered":"<p>I have written a very small python programme that follows the mapper \/ reducer sequence. This works as a replacement of a more complicated set of Java programmes that might be created to generate a mapper \/ reducer sequence. The idea is relatively simple. We create a stream from an input file. That stream is processed by a mapper programme (written in Python) that produces a series of name, value pairs. These value pairs are written to disk. That must then be sorted and subsequently streamed to a reducer programme (written in Python). That programme then produces the final outcome. This is also written to disk.<br \/>\nSuch logic is used on a hadoop platform. The idea to stream data to a mapper is translated on a hadoop platform as something that can be run concurrently on different nodes. After that the intermediate output stream is sent a stream of name, value pairs to a reducer where the final calculations are made. These results are sent as a file with name, value pairs.<br \/>\nLet us first look at an input file that is used to be analysed. It looks like:<\/p>\n<pre>\n2013-10-09\t13:22\tGouda\tWafels\t2.98\tVisa\n2013-10-09\t13:22\tNew York\tIphone\t455.76\ttMasterCard\n2013-10-09\t13:22\tNew York\tRommel\t354.76\ttMasterCard\n2016-10-09\tI\/O error\n<\/pre>\n<p>This input set is processed by this Python programme:<\/p>\n<pre>\n#!\/usr\/bin\/python\n# Your task is to make sure that this mapper code does not fail on corrupt data lines,\n# but instead just ignores them and continues working\nimport sys\n\ndef mapper():\n    # read standard input line by line\n    for line in sys.stdin:\n        # strip off extra whitespace, split on tab and put the data in an array\n        data = line.strip().split(\"\\t\")\n\n        # This is the place you need to do some defensive programming\n        # what if there are not exactly 6 fields in that line?\n        if len(data) != 6:\n            continue\n\n        # this next line is called 'multiple assignment' in Python\n        # this is not really necessary, we could access the data\n        # with data[2] and data[5], but we do this for conveniency\n        # and to make the code easier to read\n        date, time, store, item, cost, payment = data\n\n        # Now print out the data that will be passed to the reducer\n        print \"{0}\\t{1}\".format(store, cost)\n\n\ndef main():\n\n\tmapper()\n\tsys.stdin = sys.__stdin__\n\nmain()\n<\/pre>\n<p>One might test whether it actually work by a command cat test | .\/mapper.py. This should produce a set of name, value pairs. I got as outcome:<\/p>\n<pre>\n[training@localhost ~]$ cat test | .\/mapper.py\nGouda\t2.98\nNew York\t455.76\nNew York\t354.76\n<\/pre>\n<p>Next programme is a reducer programme that looks like:<\/p>\n<pre>\n#!\/usr\/bin\/python\n\nimport sys\n\n\nsalesTotal = 0\noldKey = None\n\n# Loop around the data\n# It will be in the format key\\tval\n# Where key is the store name, val is the sale amount\n#\n# All the sales for a particular store will be presented,\n# then the key will change and we'll be dealing with the next store\n\nfor line in sys.stdin:\n    data_mapped = line.strip().split(\"\\t\")\n    if len(data_mapped) != 2:\n        # Something has gone wrong. Skip this line.\n        continue\n\n    thisKey, thisSale = data_mapped\n\n    if oldKey and oldKey != thisKey:\n        print oldKey, \"\\t\", salesTotal\n        oldKey = thisKey;\n        salesTotal = 0\n\n    oldKey = thisKey\n    salesTotal += float(thisSale)\n\nif oldKey != None:\n    print oldKey, \"\\t\", salesTotal\n<\/pre>\n<p>This can also be run on Linux:<\/p>\n<pre>\n[training@localhost ~]$ cat test | .\/mapper.py | .\/reducer.py\nGouda \t2.98\nNew York \t810.52\n<\/pre>\n<p>A next step is to run everything on hadoop as a streaming set:<\/p>\n<pre>\nhadoop jar \/usr\/lib\/hadoop-0.20-mapreduce\/contrib\/streaming\/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input \/myinput\/test -output joboutput\n<\/pre>\n<p>which generates:<\/p>\n<pre>\npackageJobJar: [mapper.py, reducer.py, \/tmp\/hadoop-training\/hadoop-unjar5879319460019186346\/] [] \/tmp\/streamjob1136107035137838419.jar tmpDir=null\n16\/08\/27 12:01:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.\n16\/08\/27 12:01:12 WARN snappy.LoadSnappy: Snappy native library is available\n16\/08\/27 12:01:12 INFO snappy.LoadSnappy: Snappy native library loaded\n16\/08\/27 12:01:12 INFO mapred.FileInputFormat: Total input paths to process : 1\n16\/08\/27 12:01:12 INFO streaming.StreamJob: getLocalDirs(): [\/var\/lib\/hadoop-hdfs\/cache\/training\/mapred\/local]\n16\/08\/27 12:01:12 INFO streaming.StreamJob: Running job: job_201608271023_0015\n16\/08\/27 12:01:12 INFO streaming.StreamJob: To kill this job, run:\n16\/08\/27 12:01:12 INFO streaming.StreamJob: UNDEF\/bin\/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201608271023_0015\n16\/08\/27 12:01:12 INFO streaming.StreamJob: Tracking URL: http:\/\/0.0.0.0:50030\/jobdetails.jsp?jobid=job_201608271023_0015\n16\/08\/27 12:01:13 INFO streaming.StreamJob:  map 0%  reduce 0%\n16\/08\/27 12:01:16 INFO streaming.StreamJob:  map 100%  reduce 0%\n16\/08\/27 12:01:19 INFO streaming.StreamJob:  map 100%  reduce 100%\n16\/08\/27 12:01:21 INFO streaming.StreamJob: Job complete: job_201608271023_0015\n16\/08\/27 12:01:21 INFO streaming.StreamJob: Output: joboutput10\n<\/pre>\n<p>The output can be inspected as<\/p>\n<pre>\n[training@localhost ~]$ hadoop fs -cat \/user\/training\/joboutput10\/part-00000\nGouda \t2.98\nNew York \t810.52\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>I have written a very small python programme that follows the mapper \/ reducer sequence. This works as a replacement of a more complicated set of Java programmes that might be created to generate a mapper \/ reducer sequence. The idea is relatively simple. We create a stream from an input file. That stream is [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":1293,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-1291","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=\/wp\/v2\/posts\/1291","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1291"}],"version-history":[{"count":0,"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=\/wp\/v2\/posts\/1291\/revisions"}],"wp:attachment":[{"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1291"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1291"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/archief.van-maanen.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1291"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}