Welcome to Spark Preliminaries- RDD Creation
user@b23bdf6y221s:/projects/challenge$ spark-shell
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder() .appName("local") .master("local[*]") .getOrCreate()
2025-08-05 18:57:09 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4e773b8a
scala> spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4e773b8a
scala> val sc = spark.sparkContext
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3280a79a
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:28
scala> rdd.coalesce(1).saveAsTextFile("rdd")
scala> user@b23bdf6y221s:/projects/challenge$
Welcome to Spark Preliminaries: Create DataFrames and SQL Operations
user@vf342bt532zc:/projects/challenge$ cat Students.json
{"name":"Ben","age":"23"}
{"name":"Alen","age":"22"}
{"name":"Jonny","age":"25"}
{"name":"Steve","age":"27"}
{"name":"Stark","age":"21"}
user@vf342bt532zc:/projects/challenge$
user@vf342bt532zc:/projects/challenge$ spark-shell
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("Students JSON Processing").master("local[*]").getOrCreate()
2025-08-06 06:32:36 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@cbd3871
scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2f53e07d
scala> val studentsDF = sqlContext.read.json("Students.json")
studentsDF: org.apache.spark.sql.DataFrame = [age: string, name: string]
scala> studentsDF.show()
+---+-----+
|age| name|
+---+-----+
| 23| Ben|
| 22| Alen|
| 25|Jonny|
| 27|Steve|
| 21|Stark|
+---+-----+
scala> studentsDF.createOrReplaceTempView("students")
scala> val youngStudentsDF = spark.sql("SELECT age, name FROM students WHERE CAST(age AS INT) < 25")
2025-08-06 06:35:40 WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
2025-08-06 06:35:41 WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
2025-08-06 06:35:42 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
youngStudentsDF: org.apache.spark.sql.DataFrame = [age: string, name: string]
scala> youngStudentsDF.show()
+---+-----+
|age| name|
+---+-----+
| 23| Ben|
| 22| Alen|
| 21|Stark|
+---+-----+
scala> youngStudentsDF.coalesce(1).write.option("header", "true").csv("result")
scala>
Run the test Cases
Welcome to Spark Preliminaries: Broadcast Variable and Accumulator
Broadcast Variable and Accumulator
// Step 1: import packages
scala> import org.apache.spark.sql.SparkSession
// Step 2: Create SparkSession
scala> val spark = org.apache.spark.sql.SparkSession.builder.appName("AccumulatorExample").master("local[*]").getOrCreate()
scala> import spark.implicits._
// Step 3: Broadcast variable
scala> val broadcastVar = spark.sparkContext.broadcast(Array(1,2,3,4,5))
// Step 4: Numeric accumulator
scala> val accum = spark.sparkContext.longAccumulator("Fresco Accumulator")
// Step 5: Create RDD
scala> val data = spark.sparkContext.parallelize(Seq(1,2,3,4,5))
// Step 6: Add values to accumulator
scala> data.foreach(x => accum.add(x))
// Step 7: Store accumulator value
scala> val accumulatorVal = accum.value
// Step 8: Create RDD from accumulator value
scala> val AccumulatorRdd = spark.sparkContext.parallelize(Seq(accumulatorVal))
// Step 9: Save to text file
scala> AccumulatorRdd.coalesce(1).saveAsTextFile("Sum")
Welcome to Spark Preliminaries : RDD and RDD Operations
user@64cd53bh7bhb:/projects/challenge$ echo "Hi Friends Welcome to Fresco Play the digital learning platform" > fresco.txt
user@63cd53bh7bhb:/projects/challenge$ cat fresco.txt
Hi Friends Welcome to Fresco Play the digital learning platform
user@63d53bh7bhb:/projects/challenge$ spark-shell
// Step 1: import Packages
scala> import org.apache.spark.sql.SparkSession
// Step 2: create SparkSession
scala> val spark = SparkSession.builder.appName("WordCountExample").master("local[*]").getOrCreate()
scala> println("SparkSession created")
// Step 3: read file into RDD
scala> val newrdd = spark.sparkContext.textFile("projects/challenge/fresco.txt")
scala> println(s"RDD created from 'projects/challenge/fresco.txt' with ${newrdd.count()} line(s)")
// Step 4: compute word counts (word, count)
scala> val wordCounts = newrdd.flatMap(_.split(" ")).map(w => (w, 1)).reduceByKey(_ + _)
scala> println("Computed word counts: " + wordCounts.collect().mkString(", "))
// Step 5 save into a single-partition folder
scala> wordCounts.coalesce(1).saveAsTextFile("projects/challenge/Wordcount")
scala> println("Saved word counts to 'projects/challenge/Wordcount'")
Welcome to Spark Preliminaries: Spark Submit Example
// Step 1: Import required Spark packages
import org.apache.spark.sql.SparkSession
println("SparkSession package imported successfully")
// Step 2: Create SparkSession object
val spark = SparkSession.builder().appName("Task4").getOrCreate()
println("SparkSession object 'spark' created successfully")
After running the above in spark-shell, exit using: Ctrl + d
Step 4 in Terminal. Run the provided Python file:
$ spark-submit sample.py
Welcome to Spark Preliminaries Final Hands-On
val data = spark.sparkContext.textFile("/projects/challenge/data.csv")
// Remove header
scala> val header = data.first()
scala> val body = data.filter(row => row != header)
// Extract player_of_match (13th index)
scala> val manOfTheMatch = body.map(line => line.split(",")(13))
// Count occurrences
scala> val motmCount = manOfTheMatch.map(name => (name, 1)).reduceByKey(_ + _)
// Get top 5 players
scala> val top5 = motmCount.sortBy(_._2, ascending = false).take(5)
scala> top5.foreach(println)
(CH Gayle,17)
(YK Pathan,16)
(AB de Villiers,15)
(DA Warner,14)
(SK Raina,13)
// Save in required format ( (name,count) )
scala> spark.sparkContext.parallelize(top5).coalesce(1).saveAsTextFile("/projects/challenge/IPLData")