Hortonworks Data platform Pyspark on Python3


THis post you will learn how to setup and configure HDP to work with Python 3.

Docker repository for big-data and ML

harisekhon docker hub Spark 2.4 and Python Report

Python Basics

String interpolation

print ("My name is {}, my number is {}".format('Siva', '10'))
print ("My name is {x}, my number is {y}".format(x='Siva', y='10'))
y = 4 ** 2 # which returns 16 Power operator.


myList = [1, "Sivakumar', 1.0]
myList[0] # Will return first element
myList[-1] # Will reterieve last element in the list
myList[0] = "New" # Replace existing element
myList = [0, 1, 2, [200, 300,400]] # Nested list items
myList[3] # Will return the inner list
myList[3][1] # Will return 300 from which is second element of the inner list

Dictionary similar to other languages Hashtables

myDictionary = {"key1":"Value1", "key2": "Value2"}
myDictionary["key1"] # will return Value1


0 # also treated as false
1 # is treated as true


  • Tupes are immutable, can’t change the values once assigned. myTuples = (1, 2, 3) myTupes[0] = “new” # W ill throw error when


  • Order collection of unit elements
    {1,1,1, 5, 3, 4, 5, 2}
    {1, 2, 3,4,5} # Output will be in order

    Logic operators

    (1 == 2) and (1==1)


    ``` if (1 == 2): print (“Equal”) elif (2==2): print (“second condition true”) else: print (“Not equal”)

#### Loop

seq = [1,2,3,4,5,6,6] for jelly in seq: print(jelly)

Output 1 2 3 4 5 6 6

i = 1 while (i < 5): print(“i value is {}”.format(i)) i = i + 1


i value is 1 i value is 2 i value is 3 i value is 4

## Range function and List comprehension

list(range(5)) Will give 1,2,3,4,5 list list(range(1, 10, 2)) 1 3 5 7 9 list(range(1,5)) # Will return 1 to 5

### List Comprehension

x = [1,2,3,4,5,6] out = [] for num in x: out.append(num**2) print(out) Output : 1 4 9 16 25 36

Equlant operation with list comprehension out1 = [num ** 2 for num in x] print (out1) [1, 4, 9, 16, 25, 36]

### Function  in Python

def my_fun(name = “Defult value”):

This method

print ("Hello " + name) my_fun() - labda is an anonamous method. - Jupyter you can use Shift + Tab to help about the method. ```

Python, Pyspark and Yarn keynotes

import findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("firstProgram").getOrCreate()
df = spark.sql('''select 'spark' as hello''')

df = spark.read.json("people.json")
df.select([list of column]).show()

Create New Column

df.withColumn('double_age', df['age'] * 2).show()
df.withColumnRenamed('age', 'my_new_age').show()
results = spark.sql("SELECT * From people")
results = spark.sql("SELECT * from people where age = 30"); 

Basic DataFrame operations

import findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dfExample').getOrCreate()
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)
df.filter("close < 200").select(['open', 'close']).show()
df.filter(df["Close"] < 500).select("Volume").show()
# Filtering based on Multiple conditions
df.filter((df["close"] > 200) & ~(df["close"] < 203)).show() # will not work, will throw error Cannot convert into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building Dataframe boolean expressions.

result = df.filter(df['Low'] == 197.16).collect() # Store values to the results variable.

GroupBy Operation

df.groupBy("Company").mean().show()  #  Will return average sales in the company
df.groupBy("Company").count().show() # Will return number of records in each company
df.groupBy("Company").max().show()  # 
group_data.agg({'Sales': 'max'}).show()
  • Below code will group the company and find the max sale happen in the company
    group_data = df.groupBy("Company")
    group_data.agg({'Sales': 'max'}).show() which is equalent to group_data.max().show()`
    from pyspark.sql.functions import countDistinct, avg, stddev, format_number
    df.select(avg('Sales').alias("Average Sales")).show()
    sales_std = df.select(stddev("Sales").alias("Std"))
    sales_std.select(format_number('std', 2).alias("Std Formated")).show()
    df.orderBy("Sales").show() # Ascending order
    df.orderBy(df["Sales"].desc()).show() # Decenting order

Handling Missing Data

import findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("misss").getOrCreate()
df = spark.read.csv("ContainsNull.csv", inferSchema=True, header=True)

Dropping a value

df.na.drop(thresh=2).show() # atleast row show have 2 non null value
df.na.drop(how='any').show()  # Default
df.na.drop(subset=['Sales']).show() # Check null values in the subset

Filling mean value

df.na.fill('Fill Value')
df.na.fill('No Name', subset=['Name']).show()
from pyspark.sql.functions import mean
mean_val = df.select(mean(df['Sales'])).collect()
mean_sales = mean_val[0][0]
df.na.fill(mean_sales, ['Sales']).show()

###Achieve the above code with single line

Date Functions

from pyspark.sql.functions import dayofmonth,hour,dayofyear,month,year, weekofyear,format_number,date_format
df = spark.read.csv('appl_stock.csv', header=True, inferSchema=True)
#df.select(["date", "open"]).show()
newDF = df.withColumn("Year", year(df["Date"]))
results = newDF.groupBy("year").mean().select(["Year", "avg(CLose)"])
new = results.withColumnRenamed("avg(Close)", "Avg Closing Price")
new.select(["Year", format_number("Avg Closing Price", 2)]).show()

Step3: Setup conda

sudo yum install epel-release
sudo yum install R -y
export LD_LIBRARY_PATH=/usr/local/lib
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -P /tmp/
bash /tmp/Miniconda3-latest-Linux-x86_64.sh
add the (source /opt/miniconda3/etc/profile.d/conda.sh) to /etc/bashrc file and run source /etc/bashrc
conda install python=3.6.5
conda list
conda update conda
conda install python=3.6.5
sudo yum install readline-devel

#set in .baserc to ensure readline-devel is used.

ln -s /opt/miniconda3/bin/python3.6 /usr/bin/python3
ln -s /opt/miniconda3/bin/pip /usr/bin/pip
while read requirement; do conda install --name gavel_env --yes $requirement; done < requirements.txt

Make user sudo


Setup miniconda 3.6.5 version

https://stackoverflow.com/questions/28436769/how-to-change-default-anaconda-python-environment/28460907#28460907 Installing Python on centos Fix for R package installation issue

Good Pyspark Tutorial

Real-Time Kafka Data Ingestion into HBase via PySpark

Changing default python version to 3.6

I would suggest using ‘alternatives’ instead. As super-user (root) run the following: Start by registering python2 as an alternative

alternatives --install /usr/bin/python python /usr/bin/python2 50 alternatives --install /usr/bin/python python /usr/bin/python3.5 60 alternatives --config python

Creating and activating environment:

python3.6 -m venv gavel_env

** Go to /opt/gavel/environments folder and run the following command to start the environment. **

source gavel_env/bin/activate

Setup python3.6 has spark default environment:

export SPARK_PRINT_LAUNCH_COMMAND=1 export PYSPARK_PYTHON=python3.6 export SPARK_HISTORY_OPTS= set to hdfs file https://blog.cloudera.com/blog/2013/09/how-to-use-the-hbase-thrift-interface-part-1/ https://www.ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.product.doc/doc/bi_spark_tables.html

Where I left?


Juypter Notebook

In command prompt run the below command to open juypter notebook in the specific folder, Use SHIFT + ENTER to execute the code.

juypter notebook 

Conda create virtual environment

You can create virtual environment in windows by the below command:

conda --name virtual_env_name package_name
conda --name siva numpy

Running Pyspark on Spark

http://tech.magnetic.com/2016/03/pyspark-carpentry-how-to-launch-a-pyspark-job-with-yarn-cluster.html https://stackoverflow.com/questions/31450828/oozie-job-wont-run-if-using-pyspark-in-sparkaction/32334531

Adding depencies for pysaprk to run in spark

https://stackoverflow.com/questions/36461054/i-cant-seem-to-get-py-files-on-spark-to-work http://tech.magnetic.com/2016/03/pyspark-carpentry-how-to-launch-a-pyspark-job-with-yarn-cluster.html https://stackoverflow.com/questions/28739729/spark-submit-not-working-when-application-jar-is-in-hdfs

Spark submit for pyspark with –py-files

spark-submit --master yarn --deploy-mode client --name "Edelman heatmap" --jars phoenix-spark-4.13.1-HBase-1.2.jar,phoenix-4.13.1-HBase-1.2-client.jar --conf spark.pyspark.virtualenv.enabled=true --conf spark.pyspark.virtualenv.type=native --conf spark.pyspark.virtualenv.requirements=requirements.txt --conf spark.pyspark.virtualenv.bin.path=/opt/gavel3/environments/gavel_env/bin/virtualenv --conf spark.pyspark.python=/usr/bin/python3.6 --py-files dependencies.zip Heatmap.py 101_TICKETING_TRANSACTIONS

Running Pyspark on Yarn

spark-submit --master yarn --deploy-mode cluster --queue default --num-executors 20 --executor-memory 1G --executor-cores 2 --driver-memory 1G  Heatmap.py 101_TICKETING_TRANSACTIONS

installing Winepi estnltk packages

https://estnltk.github.io/estnltk/1.4.1/index.html https://github.com/estnltk/estnltk

Spark submit from docker


Python isntallation


pip install cryptography==2.3.1
pip install  matplotlib==3.0.2
pip install  mlxtend==0.14.0
pip install  nltk==3.3
pip install numpy==1.15.3
pip install pandas==0.23.4
pip install pytz==2018.5
pip install requests==2.20.0
pip install rpy2==2.9.4
pip install scikit-learn==0.20.0
pip install scipy==1.1.0
pip install tzlocal==1.5.1
pip install urllib3==1.24
pip install pyspark==2.4.0
pip install phoenixdb==0.6

Accessing previous Row of Dataframe

Replace based on condtion


Spark Dataframe replace values from map: https://stackoverflow.com/questions/32000646/extract-column-values-of-dataframe-as-list-in-apache-spark

HDP Spark Yarn https://hortonworks.com/tutorial/setting-up-a-spark-development-environment-with-python/


Setup PySpark,ORC,Hive on Python 3

wget https://repo.anaconda.com/miniconda/Miniconda2-latest-MacOSX-x86_64.sh -P /tmp/

ln -s /opt/miniconda3/bin/python3.6 /usr/bin/python3

Killing the process based on port id:

 sudo netstat -lutnp | grep -w '4041'
 sudo netstat -lutnp | grep -w '4042'
 sudo netstat -lutnp | grep -w '4043'
 sudo kill -9 36476