i have to map a table in which is written the history of utilization of an app. The table has got these tuples:
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
AppId
is always different, because is referenced at many app, date
is expressed in this format dd/mm/yyyy hh/mm
cpuUsage
and memoryUsage
are expressed in %
so for example:
<3ghffh3t482age20304,230720142245,0.2,3,5>
I retrieved the data from cassandra in this way (little snippet):
public static void main(String[] args) {
Cluster cluster;
Session session;
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
session = cluster.connect();
session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication "
+ "= {'class':'SimpleStrategy', 'replication_factor':3};");
String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage"
+ "(appid text,date text, cpuusage double, memoryusage double, "
+ "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);";
session.execute(createTableAppUsage);
// Use select to get the appusage's table rows
ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage");
for (Row row: resultForAppUsage)
System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage"));
// Clean up the connection by closing it
cluster.close();
}
So, my problem now is to map the data by key value
and create a tuple integrating this code (snippet that's doesn't work):
<AppId,cpuusage>
JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) {
return new Tuple2(x, y);
}
how i can map appId and cpuusage using RDD and the reduce eg. cpuusage >50
?
Any help?
thanks in advance.
Assuming that you have a valid SparkContext
sparkContext
already created, have added the spark-cassandra connector dependencies to your project and configured your spark application to talk to your cassandra cluster (see docs for that), then we can load the data in an RDD like this:In Java, the idea is the same but it requires a bit more plumbing, described here