Spark Basic Function
Check SCHEMA
data.printSchema
Check COLUMNS
data.columns
SELECT
val pracData = data.select($"fecha_dato", $"ncodpers", $"ind_empleado")
//change schema, name of columns
pracData.select($"fecha_dato".cast("timestamp") as "Date", $"ncodpers".cast("string") as "CutomerNum", $"ind_empleado".cast("string") as "employed")
// change only a name of a column
pracData.withColumnRenamed("Date", "DateTime")
// change two names of columns
pracData.withColumnRenamed("Date", "DateTime").withColumnRenamed("employed", "Employed")
GROUPY BY
//pracData.groupBy("ncodpers").sum
//pracData.groupBy("ncodpers").avg 등등 집계 함수 가능
pracData.groupBy("ncodpers").count.show
FILTER
pracData.filter($"ncodpers" === "1050599.0").show // SQL의 WHERE와 같음
//pracData.groupBy("ncodpers").count.filter($"count" <= 14)
//조금 다른 표현들('컬럼네임 = $"컬럼네임")
pracData.filter('ncodpers like "%105%").show // = pracData.filter($"ncodpers" like "%105%").show
pracData.filter("ind_empleado is null").show
새로운 COLUMN 생성하기
//pracData.withColumn(A NEW COLUMN NAME, AN EXPRESSION)
pracData.withColumn("row_number", monotonically_increasing_id()).show
pracData.withColumn("new", lit("new")).show
pracData.withColumn("100TimesOfNcodpers", col("ncodpers") * 100).show
Value들을 array로 묶기 / 풀기
//row_number column 만들어주기
val newData = pracData.withColumn("row_number", monotonically_increasing_id())
//row_number의 value들을 array로 묶기
val ArrayNew = newData.groupBy("ncodpers").agg(collect_list("row_number") as "row_list") ArrayNew.show(false)
ArrayNew.show
//묶은 value를 다시 풀기 (unnest)
val exploding = ArrayNew.withColumn("row_number", explode($"row_list")) exploding.show(false) exploding.drop("row_list").showKey 없는 데이터 합치기
//Key 없는 data 만들기
val noKeyData1 = data.select($"fecha_dato", $"ncodpers", $"ind_empleado")
val noKeyData2 = data.select($"pais_residencia", $"sexo", $"age")
//Key 만들어 주기
val noKeyData1New = noKeyData1.withColumn("id", monotonically_increasing_id())
val noKeyData2New = noKeyData2.withColumn("id", monotonically_increasing_id())
//합친 후, Key 버리기
val noKeyDataJoin = noKeyData1New.join(noKeyData2New, "id").drop("id") noKeyDataJoin.show
조건문
//조건문
pracData.withColumn("nonull", when($"ind_empleado".isNull, "Z").otherwise($"ind_empleado")).filter($"nonull" === "Z").show pracData.na.fill("Z", Seq("ind_empleado")).filter($"ind_empleado" === "Z").show
//Null 처리하는 방법 //UDF를 사용할 수도 있음.
val replaceNull = udf((col:String) => if(col.isEmpty) "Z" else col) pracData.select($"fecha_dato", $"ncodpers", replaceNull($"ind_empleado").as("nonull")).show