AWS :: AWS GLUE :: Creating ETL in cloud
For any type of consultation, query or doubt. You may contact to the following: (+91) 9804 436 193 debabrataguha20@gmail.com
and join the group https://www.facebook.com/groups/331231161093613/
AWS Glue is a cloud service that prepares data for analysis through automated extract, transform and load (ETL) processes. Let’s jump directly into some ETL examples by handling some small sample files.
1) What is Parquet files:
What is Parquet?
Parquet, an open source file format for Hadoop. Parquet stores nested data structures in a flat columnar format. Compared to a traditional approach where data is stored in row-oriented approach, parquet is more efficient in terms of storage and performance.
Why Parquet?
Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table since only needed columns are read and IO is minimized. Read this for more details on Parquet.
Advantages of Columnar structure?
·
Organizing by column allows for better
compression, as data is more homogeneous. The space savings are very noticeable
at the scale of a Hadoop cluster.
·
I/O will be reduced as we can efficiently scan
only a subset of the columns while reading the data. Better compression also
reduces the bandwidth required to read the input.
·
As we store data of the same type in each
column, we can use encoding better suited to the modern processors’ pipeline by
making instruction branching more predictable.
2) What is needed to generate Parquet from CSV
·
CSV Files (Source files):
We require 2 csv files. Let it be like below:
Type1.csv
Type2.csv
·
S3
Bucket
An Amazon S3 bucket is a public cloud storage resource available in Amazon Web Services' (AWS) Simple Storage Service (S3), an object storage offering.
So now we want to store these 2 types of CSV into 2 S3 folders or Buckets.
Create
1 bucket named “debu-test” and create 2 separate buckets for these 2 types of
CSV named “type1csv” and “type2csv” under “debu-test” under a subfolder named
“inbound”.
Upload respective files into the S3 bucket.
Create bucket for destination files under
outbound folder.
· Database for source CSV files
By using crawler , we would hold the schema of
XML files in 2 separate tables. But before using crawler, we need to manually
create a blank database in AWS Glue.
·
Configuring Crawler
To create a crawler, please follow the steps mentioned below:
1. Open AWS.. Go to AWS Glue. Go to Crawler
2.
A
crawler accesses your data store, extracts metadata, and creates table definitions in the AWS Glue Data Catalog.
3.
Click
on “Add a Crawler” And Create a new Crawler for 1st type of CSV file mentioned above. Giving Crawler name as “Type1_CSV_Crw”.
4.
Choose
Crawler source type as “Data Stores”
5. Choose S3 bucket which we have created in earlier steps as a data store.
6. No
need to add any other data store.
7. Choose
an existing IAM role or create a new one.
8. Create
a job which runs on demand.
9. We are
using a database named “test_db_debu” to create tables for source csv files.
10.
Click
on finish.
11. Select
the Crawler named “Type1_CSV_Crw” that we created and click on “Run Crawler”.
Before Running the Crawler "Tables Added" field will be showing zero.
12. After
completing the run, it would create a new table in the above said database. In
screen we can see that 1 table is created.
13. 1 new table is created for
csv_tables_type1csv in the “test_db_debu” database. This table holds the schema
and data in form of table for the CSV.
Do the same for another csv file “type2csv”
3) STEPS to generate of Parquet files:
Step 1. Go
to AWS job and click on add a job.
Step 3: Choose the csv table as data source
Step 4: Choose “Change schema”. That means , if there is any changes in the csv file, it will change the source table schema as well.
Step 5: Choose the destination parquet folder.
Step 6: Default mapping would be created like below:
Step 7: Run the job:
1 parquet file is generated at the desired location.
4) Understanding of Job Script in GLUE:
#We begin by Importing the necessary python libraries that create the
ETL Job.
import
sys
from
awsglue.transforms import *
from
awsglue.utils import getResolvedOptions
from
pyspark.context import SparkContext
from
awsglue.context import GlueContext
from
awsglue.job import Job
#Get the name of Job through the command line.
##
@params: [JOB_NAME]
args
= getResolvedOptions(sys.argv, ['JOB_NAME'])
#Initialize the GlueContext and SparkContext for the Job.
sc
= SparkContext()
glueContext
= GlueContext(sc)
spark
= glueContext.spark_session
job
= Job(glueContext)
job.init(args['JOB_NAME'],
args)
##
@type: DataSource
##
@args: [database = "test_db_debu", table_name =
"csv_tables_type1csv", transformation_ctx = "datasource0"]
##
@return: datasource0
##
@inputs: []
#Extract the data of csv_tables_type1csv (source) from the data catalog
and naming them as datasource0
#AWS Glue supports Dynamic Frames of the data.
datasource0
= glueContext.create_dynamic_frame.from_catalog(database =
"test_db_debu", table_name = "csv_tables_type1csv",
transformation_ctx = "datasource0")
#AWS Glue describing mapping from datasource0 and map to applymapping1
##
@type: ApplyMapping
##
@args: [mapping = [("id", "long", "id", "long"),
("name", "string", "name", "string"),
("grade", "string", "grade",
"string")], transformation_ctx = "applymapping1"]
##
@return: applymapping1
##
@inputs: [frame = datasource0]
applymapping1
= ApplyMapping.apply(frame = datasource0, mappings = [("id",
"long", "id", "long"), ("name",
"string", "name", "string"), ("grade",
"string", "grade", "string")], transformation_ctx
= "applymapping1")
#Several transformations are available within AWS Glue such as
RenameField, SelectField, Join, etc.
##
@type: ResolveChoice
##
@args: [choice = "make_struct", transformation_ctx =
"resolvechoice2"]
##
@return: resolvechoice2
##
@inputs: [frame = applymapping1]
resolvechoice2
= ResolveChoice.apply(frame = applymapping1, choice = "make_struct",
transformation_ctx = "resolvechoice2")
##
@type: DropNullFields
##
@args: [transformation_ctx = "dropnullfields3"]
##
@return: dropnullfields3
##
@inputs: [frame = resolvechoice2]
dropnullfields3
= DropNullFields.apply(frame = resolvechoice2, transformation_ctx =
"dropnullfields3")
##
@type: DataSink
##
@args: [connection_type = "s3", connection_options =
{"path": "s3://debu-test/outbound/type1par"}, format =
"parquet", transformation_ctx = "datasink4"]
##
@return: datasink4
##
@inputs: [frame = dropnullfields3]
datasink4
= glueContext.write_dynamic_frame.from_options(frame = dropnullfields3,
connection_type = "s3", connection_options = {"path":
"s3://debu-test/outbound/type1par"}, format = "parquet",
transformation_ctx = "datasink4")
#Finally, commit your Job.
job.commit()
5) Join CSV Files and store into Parquet in GLUE :
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "csv_tables_type1csv", transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "csv_tables_type2csv", transformation_ctx = "datasource2")
join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "grade", keys2 = "grade", transformation_ctx = "join1")
datasink1 = glueContext.write_dynamic_frame.from_options(frame = join1, connection_type = "s3", connection_options = {"path": "s3://debu-test/outbound/JoinedPar"}, format = "parquet", transformation_ctx = "datasink1")
job.commit()
Create a crawler to add the joined data into a database table named "joined_csv_joinedpar"
From Athena, check the data.
select * from "test_db_debu"."joined_csv_joinedpar"
6) Join PARQUET Files and store into Parquet in GLUE :
Create
2 crawler for 2 Parquet files generated in outbound folder.
Type1_Par_Crw
& Type2_Par_Crw
After running these 2 crawler, it will generate 2 tables with parquet data and that we can use as a source.
Then create a new job with the below script:
import
sys
from
awsglue.transforms import *
from
awsglue.utils import getResolvedOptions
from
pyspark.context import SparkContext
from
awsglue.context import GlueContext
from
awsglue.job import Job
##
@params: [JOB_NAME]
args
= getResolvedOptions(sys.argv, ['JOB_NAME'])
sc
= SparkContext()
glueContext
= GlueContext(sc)
spark
= glueContext.spark_session
job
= Job(glueContext)
job.init(args['JOB_NAME'],
args)
datasource1
= glueContext.create_dynamic_frame.from_catalog(database =
"test_db_debu", table_name =
"par_tables_part_00000_2863c3a0_e4ee_4160_b443_647c5f305c75_c000_snappy_parquet",
transformation_ctx = "datasource1")
datasource2
= glueContext.create_dynamic_frame.from_catalog(database =
"test_db_debu", table_name =
"par_tables_part_00000_f44543fb_816a_48da_97bd_8e8ba7b49b1d_c000_snappy_parquet",
transformation_ctx = "datasource2")
join1
= Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 =
"grade", keys2 = "grade", transformation_ctx =
"join1")
datasink1
= glueContext.write_dynamic_frame.from_options(frame = join1, connection_type =
"s3", connection_options = {"path":
"s3://debu-test/outbound/JoinedPar_frmPar"}, format =
"parquet", transformation_ctx = "datasink1")
job.commit()
Comments
Post a Comment