Prelude: Several months experience using both Gremlin "dialects" for FaunusGraph & TitanGraph, so well aware of the functional and syntactic diffs. Have successfully used Faunus script step (http://architects.dzone.com/articles/distributed-graph-computing , https://github.com/thinkaurelius/faunus/blob/master/src/main/java/com/thinkaurelius/faunus/mapreduce/sideeffect/ScriptMap.java) for relatively simple deletion & mutation of subgraphs.
Problem: Implemented a complex mutation script map to "move" edge properties to either the out-vertex or the in-vertex per a direction-oriented convention for naming properties. My TitanGraph Gremlin prototype works on small graphs, but I can't get the scaled-up implementation to work: the map completes successfully but the graph isn't changed (I am committing the changes). NOTE: my Logger object is only outputing the first INFO message that displays the prefix args, indicating I'm not satifying the edge namespace guard condition (I did a run without the condition, but no change). Following is my code (fat-fingering from an internal net, so typos are possible)
//faunus pipe driver - usage gremlin -e faunus.ns.set-props.grm
import java.io.Console
//get args
console=System.console()
arg=console.readLine('> type <namespace>;<faunus.ns.set-props.mapper_path>;<from_prefix>;<to_prefix>
inargs=arg.split(";")
//establish FaunusGraph connection
f=FaunusFactory.open('titan-client.properties')
f.getConf().set("faunus.graph.input.titan.storage.read-consistency-level", "ALL")
f.getConf().set("faunus.graph.input.titan.storage.write-consistency-level", "ALL")
//Faunus pipe incl. script step
f.V().has("_namespace", inargs[0]).script(inargs[1], inargs[2], inargs[3]
//script map - usage f.V().has("_namespace", <namespace_string>).script(<this_script_path>, <outV_key_prefix_string>, <inV_key_prefix_string>)
def g
def mylog
def setup(args) {
mylog=java.util.logging.Logger.getLogger("script_map")
println("configuring graph ...")
conf=new BaseConfiguration()
conf.setProperty("storage.backend", "cassandra")
conf.setProperty("storage.keyspace", "titan")
conf.setProperty("storage.index.index-name", "titan")
conf.setProperty("storage.hostname", "localhost")
g=TitanFactory.open(conf)
}
def map(v, args) {
mylog.info("*****READ***** args: "+args[0].toString()+", "+args[1].toString())
//fetch all edges incident on Titan vertex corresponding to incoming Faunus vertex
gv=g.v(v.id)
edges=gv.bothE();null
//iterate through incident edges
while(edges.hasNext()) {
e=edges.next()
if (e.hasProperty("_namespace")) { //_namespace removed from previously processed edges
/*fetch terminal vertices of current edge, add incidence & adjacency props
to support metrics and analytics
*/
from=e.getVertex(OUT)
from.setProperty("inV_degree", from.in().count())
from.setProperty("inE_degree", from.inE().count())
from.setProperty("outV_degree" from.out().count())
from.setProperty("outE_degree", from.outE().count())
to=e.getVertex(IN)
to.setProperty("inV_degree", from.in().count())
to.setProperty("inE_degree", from.inE().count())
to.setProperty("outV_degree" from.out().count())
to.setProperty("outE_degree", from.outE().count())
mylog.info("*****READ*****edge id: "+e.id)
mylog.info("*****READ*****edge vertices: from id"+fromid+"; to id: "+to.id)
//fetch property keys of current edge
ekeys=e.getPropertyKeys()
//iterate through edge property keys
for(String ekey:ekeys)
eprop=e.getProperty(ekey) //get value of current property key
goodprop=!(eprop == "" || eprop == null)
mylog.info("*****READ*****edge key/value: "+ekey+"="eprop)
/*determine placement of current key/value on one or neither of the
terminal vertices based on key prefix arges and property value,
remove prefix from re-assigned key/value
*/
if(ekey.startsWith(args[0]) && goodprop) {
vkey=ekey.split(args[0])[1]
if(!from.hasProperty(vkey)) from.setProperty(vkey, eprop)
else {
vprop=from.getProperty(vkey)
if(!vprop.equal(eprop) from.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****from vertex key/value: "+vkey+"="+from.getProperty(vkey)
}
else if(ekey.startsWith(args[1]) && goodprop) {
vkey=ekey.split(args[1])[1]
if(!to.hasProperty(vkey)) to.setProperty(vkey, eprop)
else {
vprop=to.getProperty(vkey)
if(!vprop.equal(eprop) to.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****tovertex key/value: "+vkey+"="+to.getProperty(vkey)
}
//if current edge property key is re-assigned, remove it from the edge
if(ekey.startsWith(args[0]) || ekey.startsWith(args[1])) {
e.removeProperty(ekey)
if(e.hasProperty(ekey) println(ekey+" NOT remvoded from edge")
else println(ekey+ "removed from edge")
}
e.removeProperty("_namespace") // marks edge as processed per outer loop guard
}
}
}
g.commit()
}
def cleanup(args) {
g.shutdown()
}
This line:
hasProperty
doesn't seem to be in the Blueprints API or the Titan API. Since that is the case, I'm not sure how this code worked in your smaller test db, as it will always resolve tofalse
and you will never see the inside of thatif
statement:I suppose you really want to try this: