What is the exact way to send erlang module and erlang function to the mapreduce face in python-riak client l

382 views Asked by At

can any body tell with an example of what is the correct way of sending the erlang module and erlang function to the

query.map()

in python riak client, In Documents it was like

function (string, list) – Either a named Javascript function (ie: ‘Riak.mapValues’), or an anonymous javascript function (ie: ‘function(...) ... ‘ or an array [‘erlang_module’, ‘function’].
options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.

but there is no clear information of what i have to send.actually i've been giving the query.map() phase as

query.map(['maps','fun']) # maps is the maps.erl and fun is the function in the maps.erl file

I have set the beam files path under the app.cofig as mentioned in the documents, to keep the compiled beam files. i have did all those things but , i am getting error after running the commands

 query.map(['maps','funs'])
 >>> query.run()
 Traceback (most recent call last):
 File "<input>", line 1, in <module>
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/mapreduce.py", line 234, in run
 result = t.mapred(self._inputs, query, timeout)
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/transports/http.py", line 322, in mapred
(repr(response[0]), repr(response[1])))
 Exception: Error running MapReduce operation. Headers: {'date': 'Mon, 26 May 2014 11:24:04 GMT', 'content-length': '1121', 'content-type': 'application/json'
, 'http_code': 500, 'server': 'MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)'} Body: '{"phase":0,"error":"undef","input":"{ok,{r_object,<<\\"tst\
\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\"X-Riak-VTag\\
">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,65,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]
],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10>>,{1,63567559559}}],   {dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],
[],[],[],[],[],[],[],[],[],...}}},...},...}","type":"error","stack":"[{maps,funs,  [{r_object,<<\\"tst\\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{
[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[], [[<<\\"X-Riak-VTag\\">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,6
5,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10
>>,{1,63567559559}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],...}}},...},...],...},...]"}'

what was the wrong what i have missed, please suggest me

1

There are 1 answers

3
Joe On BEST ANSWER

There are 3 parts to using an Erlang map function with the Python client:

  • writing and compiling the Erlang module
  • preparing the Riak cluster
  • invoking the function from the Python client

The Erlang module should be fairly straightforward, for this example I will have the map function return the number of values(siblings) for each key:

-module(custom_mr).

-export([mapcount/3]).

mapcount(Obj,_Keydata,_Arg) ->
  [length(riak_object:get_values(Obj))].

Versions of Erlang vary in subtle ways, so it will be safer to use Riak's bundled Erlang, or the same one you used to compile it if you built from source. The resultant .beam file will need to be placed in a directory that is readable by the user that Riak is running as - this defaults to riak if you used a package install. You will need to deploy the .beam file and modify the app.config at each node in the cluster.

# /usr/lib/riak/erts-5.9.1/bin/erlc custom_mr.erl
# mkdir /var/lib/riak/custom_code
# mv custom_mr.beam /var/lib/riak/custom_code
# chown -R riak:riak /var/lib/riak/custom_code

Then edit app.config and add {add_paths,["/var/lib/riak/custom_code"]} to the riak_kv section, and restart the node.

Test from riak attach to make sure the new module has been loaded - in this example, nodes 1-4 have loaded the module, but node5 is down:

# riak attach
1> riak_core_util:rpc_every_member_ann(code,which,[custom_mr]).
{[{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"}],
 ['[email protected]']}
2> custom_mr:mapcount(riak_object:new(<<"test">>,<<"test">>,<<"test">>),keydata,arg).
[1]

(detach from the riak console with ctrl-d if you are running a pre-1.4 version, otherwise ctrl-c a)

Lastly, the Python code (I used the filename test.py):

import riak

client = riak.RiakClient()

test_bucket = client.bucket('test')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3})
data3.store()

query = riak.RiakMapReduce(client).add('testbucket')
query.map(['custom_mr','mapcount'])

for result in query.run():
    print "%s" % (result)

Running this code returns a 1 for each key in the bucket:

#python test.py
1
1
1

NOTE I did not do a get before putting the new values, so if your default bucket properties include allow_mult:true, running this a second time will create a sibling for each value, and you will get '2's instead of '1's


Adding further examples


New module, compile and install as above

-module(custom_mr).

-export([mapcount/3,
         mapvalue/3,
         mapfield/3,
         mapfieldwithid/3,
         reducecount/2,
         reducepropsort/2,
         reducedoublesort/2,
         reducesort/2]).

mapcount(Obj,_Kd,_Arg) ->
  [length(riak_object:get_values(Obj))].

mapvalue(Obj,_Kd,_Arg) ->
  [hd(riak_object:get_values(Obj))].

mapfield(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ -> 
        [{Arg,{error,notjson}}]
  end,
  [list_to_binary(mochijson2:encode(Val))]. 

mapfieldwithid(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ -> 
        [{Arg,{error,notjson}}]
  end,
  V = [{bucket,riak_object:bucket(Obj)},{key,riak_object:key(Obj)}|Val],
  [list_to_binary(mochijson2:encode(V))].

reducecount(L,_Arg) ->
  [lists:sum([ N || N <- L, is_integer(N) ])].

sortfun(F) ->
  fun(A,B) ->
    proplists:get_value(F,A,<<"zzzz">>) =< proplists:get_value(F,B,<<"zzzz">>)
  end. 

reducepropsort(L,Arg) ->
  Decoded = [ I || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].

reducesort(L,_Arg) ->
  lists:sort(L).

reducedoublesort(L,Arg) ->
  Decoded = [ lists:sort(I) || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].

Python code

import riak

client = riak.RiakClient(pb_port=8087, host="172.31.0.1", protocol='pbc')

test_bucket = client.bucket('test_bucket')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1, 'zone':'D'})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2, 'zone':'A'})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3, 'zone':'C'})
data3.store()


def printresult(q):
  for result in q.run():
    print "%s" % (result)

print "\nCount the number of values in the bucket"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapcount'])
query.reduce(['custom_mr','reducecount'])
printresult(query)

print "\nList all values in natual sort order"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapvalue'])
query.reduce(['custom_mr','reducesort'])
printresult(query)

print "\nList all values sorted by 'zone'"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)

print "\nList all values sorted by 'zone', also sort the fields in each object"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducedoublesort'],{'arg':'zone'})
printresult(query)

print "\nList just field3, sorted"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nList just bucket,key,field3, sorted by field3"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfieldwithid'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nReturn just the zone for key2"
query = riak.RiakMapReduce(client).add('test_bucket','key2')
query.map(['custom_mr','mapfield'],{'arg':'zone'})
printresult(query)

print "\nReturn the bucket,key,zone for key1 and key3"
query = riak.RiakMapReduce(client).add('test_bucket',['key1','key3'])
query.map(['custom_mr','mapfieldwithid'],{'arg':'zone'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)

Please Note: Many of these examples use full-bucket MapReduce which will be very heavy and will likely affect performance if used on a non-trivial amount of data. The last 2 examples show how to select a specific key or a list of keys as input. A Secondary Index or Riak Search could also be used as input if the cluster is setup with those, see riak-python-client query inputs in the docs.

And the output:

# python ~/test.py

Count the number of values in the bucket
3

List all values in natual sort order
{"field2": "1data2", "field3": 1, "field1": "1data1", "zone": "D"}
{"field2": "2data2", "field3": 2, "field1": "2data1", "zone": "A"}
{"field2": "3data2", "field3": 3, "field1": "3data1", "zone": "C"}

List all values sorted by 'zone'
{"field2":"2data2","field3":2,"field1":"2data1","zone":"A"}
{"field2":"3data2","field3":3,"field1":"3data1","zone":"C"}
{"field2":"1data2","field3":1,"field1":"1data1","zone":"D"}

List all values sorted by 'zone', also sort the fields in each object
{"field1":"2data1","field2":"2data2","field3":2,"zone":"A"}
{"field1":"3data1","field2":"3data2","field3":3,"zone":"C"}
{"field1":"1data1","field2":"1data2","field3":1,"zone":"D"}

List just field3, sorted
{"field3":1}
{"field3":2}
{"field3":3}

List just bucket,key,field3, sorted by field3
{"bucket":"test_bucket","key":"key1","field3":1}
{"bucket":"test_bucket","key":"key2","field3":2}
{"bucket":"test_bucket","key":"key3","field3":3}

Return just the zone for key2
{"zone":"A"}

Return the bucket,key,zone for key1 and key3
{"bucket":"test_bucket","key":"key3","zone":"C"}
{"bucket":"test_bucket","key":"key1","zone":"D"}