Spark useful code snippets

4 minute read

Published:

Filling Gap in Time series

1
2
3
4
5
6
7
8
9
from pyspark.sql.functions import col, min as min_, max as max_
step = 15 * 60
minp, maxp = df.select(
    min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()
reference = spark.range(
    (minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))
reference.join(df, ["periodstart"], "leftouter")

https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark

Kafka Direct Streaming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  
sc = SparkContext("local[2]","myConsumer")
ssc = StreamingContext(sc, 10)
kafkaParams = {"metadata.broker.list":"kafka_server_ip:9092", 
'auto.offset.reset' : 'smallest',
"group.id" : "group1",
"key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
topic = "topic_name"
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)

ssc.start()
ssc.awaitTermination()

https://stackoverflow.com/questions/55594244/java-kafka-consumer-and-avro-deserialzier

Important url https://github.com/confluentinc/schema-registry/issues/755

https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala

Spark Solr Integration

1
2
3
4
5
6
7
pyspark --jars spark-solr-3.4.5-shaded.jar

from pyspark.sql import functions as F
zkUrl = "zookeeper_server:2181/solr"
data = sqlContext.read.format("solr").option("collection", "collection_name").option("zkhost", zkUrl).option("max_rows","100").load()
newData = data.select("*")
newData.write.format("solr").option("zkhost", zkUrl).option("collection", "collection_name").option("soft_commit_secs","10").save()

Spark window with Rownumber

1
2
3
4
5
6
7
8
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.json("/user/spark/gcare_alerts_less.json")
df = df.filter(df.id.isNotNull())
df.show(10)
val1 = 100
df2 = df.select("*",(val1+F.row_number().over(Window.partitionBy(df.LABEL_Id).orderBy(df.GavelAlertTime_UTC))).alias("ROWNUM"))
df2.select(df2.ROWNUM,df2.id,df2.LABEL_Id,df2.GavelAlertTime).show(10, False)

Reading Avro from kafka topic

1
pyspark --master local[*] --deploy-mode client --driver-memory 1G  --num-executors 4 --conf spark.ui.enabled=false --conf spark.driver.maxResultSize=3g --jars /home/spark/spark-solr-3.4.5-shaded.jar,/home/spark/kafka-clients-0.10.0.0.jar,/home/spark/phoenix-spark-5.0.0.3.1.0.0-78.jar,/home/spark/phoenix-5.0.0.3.1.0.0-78-client.jar, --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,org.apache.kafka:kafka-clients:0.10.0.0,io.confluent:kafka-avro-serializer:3.0.0,com.databricks:spark-avro_2.11:4.0.0,za.co.absa:abris_2.11:3.1.0 --repositories https://repo1.maven.org/maven2/,http://packages.confluent.io/maven/

Spark Redis example

1
2
3
4
5
pyspark --packages com.redislabs:spark-redis:2.4.0 --conf "spark.redis.host=hostname" --conf "spark.redis.port=6379" --conf "spark.redis.auth=password"
 
full_df = spark.read.csv("/user/spark/redis-try/pantheon.tsv", sep="\t", quote="", header=True, inferSchema=True)
data = full_df.select("en_curid", "countryCode", "occupation")
data.write.format("org.apache.spark.sql.redis").option("table", "people").option("key.column", "en_curid").save()

Spark Incremental ID

1
2
3
4
5
6
7
8
https://stackoverflow.com/questions/45228325/how-to-add-new-column-not-based-on-exist-column-in-dataframe-with-scala-spark?noredirect=1&lq=1

https://stackoverflow.com/questions/55341313/how-to-create-an-unique-autogenerated-id-column-in-a-spark-dataframe?noredirect=1&lq=1

https://stackoverflow.com/questions/45228325/how-to-add-new-column-not-based-on-exist-column-in-dataframe-with-scala-spark?noredirect=1&lq=1


https://community.hortonworks.com/questions/199012/how-can-i-load-the-next-value-of-a-phoenix-db-sequ.html

Spark reading/Writing Mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pyspark.sql.functions as f
df = sqlContext.read.format("jdbc").options(
url ="jdbc:mysql://localhost/dbname",
driver="com.mysql.jdbc.Driver",
dbtable="employee",
user="root",
password=""
).load()

df2 = spark.createDataFrame(df.collect())
df2 = df.withColumn("emp_name", f.concat(f.col("emp_name"), f.lit("Udpated")))
df2.show(10)
df2.write.format('jdbc').mode("overwrite").options(
      url ="jdbc:mysql://localhost/dbname",
      driver='com.mysql.jdbc.Driver',
      dbtable="employee",
      user="root",
      password="").save()

Reading from Parquet

1
2
3
4
from pyspark.sql.functions import *
df = spark.read.parquet("/user/spark/prediction/Solr/DailyRun/WinDiskStage2")
df.select("NAME","minTime","maxTime").filter(df.NAME == "NAME").show(100)
df = spark.read.parquet("/user/spark/prediction/Solr/DailyRun/WinDiskStage2")

Reading from JSON

1
2
3
4
df = sqlContext.createDataFrame([1],IntegerType())
df.show(10)
df.write.json("/user/spark/appconfig/case.json")
df = sparkSession.read.json("/user/spark/appconfig/case.json")

Create dataframe from list

1
2
3
from pyspark.sql.types import IntegerType
mylist = [1, 2, 3, 4]
mydf= spark.createDataFrame(mylist, IntegerType())

Less than current_time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType,TimestampType,DoubleType,DateType,ArrayType
import datetime
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction
values = [(1,"2019/07/01 23:00:01","2019/07/10 23:00:01"),
(2,"2019/08/01 23:00:01","2019/07/10 23:00:01")
]
columns = ['Key', 'LATEST_POLL_TIME', 'F_START_TIME']		  
mydf = spark.createDataFrame(values, columns)
format = "yyyy/MM/dd HH:mm:ss"
mydf =mydf.withColumn('LATEST_POLL_TIME',unix_timestamp(mydf.LATEST_POLL_TIME, format).cast('timestamp'))
mydf.where(mydf.LATEST_POLL_TIME < current_timestamp()).show(10)
mydf.select(current_timestamp()).show(10)

Spark generate interval data between two dates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType,TimestampType,DoubleType,DateType,ArrayType
import datetime
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction
values = [(1,"2019/07/01 23:00:01","2019/07/10 23:00:01")]
columns = ['Key', 'LATEST_POLL_TIME', 'F_START_TIME']		  
mydf = spark.createDataFrame(values, columns)
format = "yyyy/MM/dd HH:mm:ss"
mydf =mydf.withColumn('start',unix_timestamp(mydf.LATEST_POLL_TIME, format).cast('timestamp'))
mydf = mydf .withColumn('stop',unix_timestamp(mydf.F_START_TIME, format).cast('timestamp'))
mydf.show(10)
#mylist = [1, 2, 3, 4]
#mydf= spark.createDataFrame(mylist, IntegerType())

def generate_date_series(start, count): return [start + datetime.timedelta(hours=x) for x in range(0, count).seconds + 300 )]

spark.udf.register(“generate_date_series”, generate_date_series, ArrayType(TimestampType()) ) mydf.createOrReplaceTempView(“mydf”) df = spark.sql(“SELECT explode(generate_date_series(start, stop)) FROM mydf”) df.count()

Spark dataframe handling null values

todo https://datascience.stackexchange.com/questions/38021/replacing-null-with-average-in-pyspark

Spark Get hours between two dates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType,TimestampType,DoubleType,DateType,ArrayType
values = [(1,"2019/07/01 23:00:01","2019/07/02 23:00:01"),
(2,"2019/07/01 12:40:32", "2019/07/08 23:00:01"),
(3,"2019/07/01 09:54:00", "2019/07/08 23:00:01"),
(4,"2019/07/01 10:12:43", "2019/07/08 23:00:01")]
columns = ['Key', 'LATEST_POLL_TIME', 'F_START_TIME']		  
df = spark.createDataFrame(values, columns)
format = "yyyy/MM/dd HH:mm:ss"
df = df.withColumn('LATEST_POLL_TIME',unix_timestamp(df.LATEST_POLL_TIME, format).cast('timestamp'))
df = df.withColumn('F_START_TIME',unix_timestamp(df.F_START_TIME, format).cast('timestamp'))
df = df.withColumn("seconds",  unix_timestamp(df.F_START_TIME, format) - unix_timestamp(df.LATEST_POLL_TIME, format))
df = df.withColumn("minutes",  df.seconds/60)
df = df.withColumn("hours",  df.minutes/60)
df.show(10)

Useful codebase

Good Code base links:

https://www.programcreek.com/scala/org.apache.spark.sql.functions https://github.com/ippontech/spark-bbl-prez/blob/master/spark/src/main/scala/fr/ippon/spark/ml/Titanic.scala

https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline https://community.hortonworks.com/articles/53903/spark-machine-learning-pipeline-by-example.html

Spark Holtwinters

Spark HOLT-WINTER

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.cloudera.sparkts.models._
val tsAirPassengers = new DenseVector(Array(112.0,118.0,132.0,129.0,121.0,135.0,148.0,148.0,136.0,119.0,104.0,118.0,115.0,126.0,
141.0,135.0,125.0,149.0,170.0,170.0,158.0,133.0,114.0,140.0,145.0,150.0,178.0,163.0,
172.0,178.0,199.0,199.0,184.0,162.0,146.0,166.0,171.0,180.0,193.0,181.0,183.0,218.0,
230.0,242.0,209.0,191.0,172.0,194.0,196.0,196.0,236.0,235.0,229.0,243.0,264.0,272.0,
237.0,211.0,180.0,201.0,204.0,188.0,235.0,227.0,234.0,264.0,302.0,293.0,259.0,229.0,
203.0,229.0,242.0,233.0,267.0,269.0,270.0,315.0,364.0,347.0,312.0,274.0,237.0,278.0,
284.0,277.0,317.0,313.0,318.0,374.0,413.0,405.0,355.0,306.0,271.0,306.0,315.0,301.0,
356.0,348.0,355.0,422.0,465.0,467.0,404.0,347.0,305.0,336.0,340.0,318.0,362.0,348.0,
363.0,435.0,491.0,505.0,404.0,359.0,310.0,337.0,360.0,342.0,406.0,396.0,420.0,472.0,
548.0,559.0,463.0,407.0,362.0,405.0,417.0,391.0,419.0,461.0,472.0,535.0,622.0,606.0,
508.0,461.0,390.0,432.0
))
val period = 12
val model = HoltWinters.fitModel(tsAirPassengers, period, "additive", "BOBYQA")
val forecasted = new DenseVector(new Array[Double](period))
model.forecast(tsAirPassengers, forecasted)

Spark important 5 methods

Spark Processing 5 important method

https://databricks.com/blog/2017/06/13/five-spark-sql-utility-functions-extract-explore-complex-data-types.html https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html https://sonra.io/2017/11/27/advanced-spark-structured-streaming-aggregations-joins-checkpointing/

Spark Streaming with Complex Data https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

Machine Learning

http://www.learnbymarketing.com/about-learn-by-marketing/ http://discuss.itversity.com/t/parse-json-schema-from-kafka-message-in-spark/17101

Examples of Avro on web

Please find below some resources to learn how to create Avro Schemas and view existing ones: Avro Documentation: http://avro.apache.org/docs/current/spec.html Oracle Avro Getting Started: https://docs.oracle.com/cd/E57769_01/html/GettingStartedGuide/avroschemas.html Avro Schemas used by Rabo Bank: https://github.com/Axual/rabo-alerts-blog-post/tree/master/src/main/avro Avro examples by Gwen Shapira: https://github.com/gwenshap/kafka-examples

From https://www.udemy.com/confluent-schema-registry/learn/v4/t/lecture/8624278?start=0

Creating GenericRecord is used to create an avro object from a schema, the schema being referenced as A. File A String