AWS :: AWS GLUE :: Creating ETL in cloud


For any type of consultation, query or doubt. You may contact to the following: (+91) 9804 436 193 debabrataguha20@gmail.com 
and join the group  https://www.facebook.com/groups/331231161093613/


AWS Glue is a cloud service that prepares data for analysis through automated extract, transform and load (ETL) processes. Let’s jump directly into some ETL examples by handling some small sample files.



1)   What is Parquet files:



What is Parquet?

Parquet, an open source file format for Hadoop. Parquet stores nested data structures in a flat columnar format. Compared to a traditional approach where data is stored in row-oriented approach, parquet is more efficient in terms of storage and performance.

Why Parquet?

Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table since only needed columns are read and IO is minimized. Read this for more details on Parquet.

Advantages of Columnar structure?

·        Organizing by column allows for better compression, as data is more homogeneous. The space savings are very noticeable at the scale of a Hadoop cluster.

·        I/O will be reduced as we can efficiently scan only a subset of the columns while reading the data. Better compression also reduces the bandwidth required to read the input.

·        As we store data of the same type in each column, we can use encoding better suited to the modern processors’ pipeline by making instruction branching more predictable.




2)   What is needed to generate Parquet from CSV

·        CSV Files (Source files):



                We require 2 csv files. Let it be like below:



                  Type1.csv

             

                  Type2.csv

             




·        S3 Bucket



                An Amazon S3 bucket is a public cloud storage resource available in Amazon                       Web Services' (AWS) Simple Storage Service (S3), an object storage offering.



                So now we want to store these 2 types of CSV into 2 S3 folders or Buckets.



           Create 1 bucket named “debu-test” and create 2 separate buckets for these 2                  types of CSV named “type1csv” and “type2csv” under “debu-test” under a                        subfolder named “inbound”.

       

                    Upload respective files into the S3 bucket.

               



                       Create bucket for destination files under outbound folder.
                        


·       Database for source CSV files


By using crawler , we would hold the schema of XML files in 2 separate tables. But before using crawler, we need to manually create a blank database in AWS Glue.    




·        Configuring Crawler

                       To create a crawler, please follow the steps mentioned below:

                       1.     Open AWS.. Go to AWS Glue. Go to Crawler
                       
                        


     2.     A crawler accesses your data store, extracts metadata, and creates table   definitions in the AWS Glue Data Catalog.



     3.     Click on “Add a Crawler” And Create a new Crawler for 1st type of CSV file  mentioned above. Giving Crawler name as “Type1_CSV_Crw”.


     4.     Choose Crawler source type as “Data Stores”


    5. Choose S3 bucket which we have created in earlier steps as a data store. 



    6. No need to add any other data store.


     7. Choose an existing IAM role or create a new one.


    8. Create a job which runs on demand.


    9. We are using a database named “test_db_debu” to create tables for source csv files.


    10.     Click on finish.


    11. Select the Crawler named “Type1_CSV_Crw” that we created and click on “Run Crawler”.

 

     Before Running the Crawler "Tables Added" field will be showing zero.

 

      12. After completing the run, it would create a new table in the above said database. In screen we can see that 1 table is created.
   
   

      13. 1 new table is created for csv_tables_type1csv in the “test_db_debu” database. This table holds the schema and data in form of table for the CSV.
   
  

            
           Do the same for another csv file “type2csv”

3)   STEPS to generate of Parquet files:


      
              Step 1. Go to AWS job and click on add a job.
                       
                        


         Step 2. Configure a job using codes of Spark

                 

         Step 3:  Choose the csv table as data source
               
                 

         Step 4: Choose “Change schema”. That means , if there is any changes in the csv file,                       it will change the source table schema as well.

                 


         Step 5: Choose the destination parquet folder.

                 

         Step 6: Default mapping would be created like below:
                 
                 

         Step 7: Run the job:

                 

        1 parquet file is generated at the desired location.



4)   Understanding of Job Script in GLUE:


#We begin by Importing the necessary python libraries that create the ETL Job.

import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job







#Get the name of Job through the command line.

## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])





#Initialize the GlueContext and SparkContext for the Job.

sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

## @type: DataSource

## @args: [database = "test_db_debu", table_name = "csv_tables_type1csv", transformation_ctx = "datasource0"]

## @return: datasource0

## @inputs: []



#Extract the data of csv_tables_type1csv (source) from the data catalog and naming them as datasource0

#AWS Glue supports Dynamic Frames of the data.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "csv_tables_type1csv", transformation_ctx = "datasource0")



#AWS Glue describing mapping from datasource0 and map to applymapping1

## @type: ApplyMapping

## @args: [mapping = [("id", "long", "id", "long"), ("name", "string", "name", "string"), ("grade", "string", "grade", "string")], transformation_ctx = "applymapping1"]

## @return: applymapping1

## @inputs: [frame = datasource0]

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "long", "id", "long"), ("name", "string", "name", "string"), ("grade", "string", "grade", "string")], transformation_ctx = "applymapping1")



#Several transformations are available within AWS Glue such as RenameField, SelectField, Join, etc.

## @type: ResolveChoice

## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]

## @return: resolvechoice2

## @inputs: [frame = applymapping1]

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")



## @type: DropNullFields

## @args: [transformation_ctx = "dropnullfields3"]

## @return: dropnullfields3

## @inputs: [frame = resolvechoice2]

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")



## @type: DataSink

## @args: [connection_type = "s3", connection_options = {"path": "s3://debu-test/outbound/type1par"}, format = "parquet", transformation_ctx = "datasink4"]

## @return: datasink4

## @inputs: [frame = dropnullfields3]

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://debu-test/outbound/type1par"}, format = "parquet", transformation_ctx = "datasink4")

#Finally, commit your Job.

job.commit()




5) Join CSV Files and store into Parquet in GLUE :

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "csv_tables_type1csv", transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "csv_tables_type2csv", transformation_ctx = "datasource2")

join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "grade", keys2 = "grade", transformation_ctx = "join1")


datasink1 = glueContext.write_dynamic_frame.from_options(frame = join1, connection_type = "s3", connection_options = {"path": "s3://debu-test/outbound/JoinedPar"}, format = "parquet", transformation_ctx = "datasink1")

job.commit()



Create a crawler to add the joined data into a database table named "joined_csv_joinedpar"
From Athena, check the data.

select * from "test_db_debu"."joined_csv_joinedpar"


6) Join PARQUET Files and store into Parquet in GLUE :


Create 2 crawler for 2 Parquet files generated in outbound folder.


Type1_Par_Crw & Type2_Par_Crw


     After running these 2 crawler, it will generate 2 tables with parquet data and that we can        use as a source.


     Then create a new job with the below script:



import sys

from awsglue.transforms import *

from awsglue.utils import getResolvedOptions

from pyspark.context import SparkContext

from awsglue.context import GlueContext

from awsglue.job import Job



## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])



sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)





datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "par_tables_part_00000_2863c3a0_e4ee_4160_b443_647c5f305c75_c000_snappy_parquet", transformation_ctx = "datasource1")

datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "test_db_debu", table_name = "par_tables_part_00000_f44543fb_816a_48da_97bd_8e8ba7b49b1d_c000_snappy_parquet", transformation_ctx = "datasource2")



join1 = Join.apply(frame1 = datasource1, frame2 = datasource2, keys1 = "grade", keys2 = "grade", transformation_ctx = "join1")





datasink1 = glueContext.write_dynamic_frame.from_options(frame = join1, connection_type = "s3", connection_options = {"path": "s3://debu-test/outbound/JoinedPar_frmPar"}, format = "parquet", transformation_ctx = "datasink1")



job.commit()






Comments

Popular posts from this blog

Java concepts Theory