• United States+1
  • United Kingdom+44
  • Afghanistan (‫افغانستان‬‎)+93
  • Albania (Shqipëri)+355
  • Algeria (‫الجزائر‬‎)+213
  • American Samoa+1684
  • Andorra+376
  • Angola+244
  • Anguilla+1264
  • Antigua and Barbuda+1268
  • Argentina+54
  • Armenia (Հայաստան)+374
  • Aruba+297
  • Australia+61
  • Austria (Österreich)+43
  • Azerbaijan (Azərbaycan)+994
  • Bahamas+1242
  • Bahrain (‫البحرين‬‎)+973
  • Bangladesh (বাংলাদেশ)+880
  • Barbados+1246
  • Belarus (Беларусь)+375
  • Belgium (België)+32
  • Belize+501
  • Benin (Bénin)+229
  • Bermuda+1441
  • Bhutan (འབྲུག)+975
  • Bolivia+591
  • Bosnia and Herzegovina (Босна и Херцеговина)+387
  • Botswana+267
  • Brazil (Brasil)+55
  • British Indian Ocean Territory+246
  • British Virgin Islands+1284
  • Brunei+673
  • Bulgaria (България)+359
  • Burkina Faso+226
  • Burundi (Uburundi)+257
  • Cambodia (កម្ពុជា)+855
  • Cameroon (Cameroun)+237
  • Canada+1
  • Cape Verde (Kabu Verdi)+238
  • Caribbean Netherlands+599
  • Cayman Islands+1345
  • Central African Republic (République centrafricaine)+236
  • Chad (Tchad)+235
  • Chile+56
  • China (中国)+86
  • Christmas Island+61
  • Cocos (Keeling) Islands+61
  • Colombia+57
  • Comoros (‫جزر القمر‬‎)+269
  • Congo (DRC) (Jamhuri ya Kidemokrasia ya Kongo)+243
  • Congo (Republic) (Congo-Brazzaville)+242
  • Cook Islands+682
  • Costa Rica+506
  • Côte d’Ivoire+225
  • Croatia (Hrvatska)+385
  • Cuba+53
  • Curaçao+599
  • Cyprus (Κύπρος)+357
  • Czech Republic (Česká republika)+420
  • Denmark (Danmark)+45
  • Djibouti+253
  • Dominica+1767
  • Dominican Republic (República Dominicana)+1
  • Ecuador+593
  • Egypt (‫مصر‬‎)+20
  • El Salvador+503
  • Equatorial Guinea (Guinea Ecuatorial)+240
  • Eritrea+291
  • Estonia (Eesti)+372
  • Ethiopia+251
  • Falkland Islands (Islas Malvinas)+500
  • Faroe Islands (Føroyar)+298
  • Fiji+679
  • Finland (Suomi)+358
  • France+33
  • French Guiana (Guyane française)+594
  • French Polynesia (Polynésie française)+689
  • Gabon+241
  • Gambia+220
  • Georgia (საქართველო)+995
  • Germany (Deutschland)+49
  • Ghana (Gaana)+233
  • Gibraltar+350
  • Greece (Ελλάδα)+30
  • Greenland (Kalaallit Nunaat)+299
  • Grenada+1473
  • Guadeloupe+590
  • Guam+1671
  • Guatemala+502
  • Guernsey+44
  • Guinea (Guinée)+224
  • Guinea-Bissau (Guiné Bissau)+245
  • Guyana+592
  • Haiti+509
  • Honduras+504
  • Hong Kong (香港)+852
  • Hungary (Magyarország)+36
  • Iceland (Ísland)+354
  • India (भारत)+91
  • Indonesia+62
  • Iran (‫ایران‬‎)+98
  • Iraq (‫العراق‬‎)+964
  • Ireland+353
  • Isle of Man+44
  • Israel (‫ישראל‬‎)+972
  • Italy (Italia)+39
  • Jamaica+1876
  • Japan (日本)+81
  • Jersey+44
  • Jordan (‫الأردن‬‎)+962
  • Kazakhstan (Казахстан)+7
  • Kenya+254
  • Kiribati+686
  • Kosovo+383
  • Kuwait (‫الكويت‬‎)+965
  • Kyrgyzstan (Кыргызстан)+996
  • Laos (ລາວ)+856
  • Latvia (Latvija)+371
  • Lebanon (‫لبنان‬‎)+961
  • Lesotho+266
  • Liberia+231
  • Libya (‫ليبيا‬‎)+218
  • Liechtenstein+423
  • Lithuania (Lietuva)+370
  • Luxembourg+352
  • Macau (澳門)+853
  • Macedonia (FYROM) (Македонија)+389
  • Madagascar (Madagasikara)+261
  • Malawi+265
  • Malaysia+60
  • Maldives+960
  • Mali+223
  • Malta+356
  • Marshall Islands+692
  • Martinique+596
  • Mauritania (‫موريتانيا‬‎)+222
  • Mauritius (Moris)+230
  • Mayotte+262
  • Mexico (México)+52
  • Micronesia+691
  • Moldova (Republica Moldova)+373
  • Monaco+377
  • Mongolia (Монгол)+976
  • Montenegro (Crna Gora)+382
  • Montserrat+1664
  • Morocco (‫المغرب‬‎)+212
  • Mozambique (Moçambique)+258
  • Myanmar (Burma) (မြန်မာ)+95
  • Namibia (Namibië)+264
  • Nauru+674
  • Nepal (नेपाल)+977
  • Netherlands (Nederland)+31
  • New Caledonia (Nouvelle-Calédonie)+687
  • New Zealand+64
  • Nicaragua+505
  • Niger (Nijar)+227
  • Nigeria+234
  • Niue+683
  • Norfolk Island+672
  • North Korea (조선 민주주의 인민 공화국)+850
  • Northern Mariana Islands+1670
  • Norway (Norge)+47
  • Oman (‫عُمان‬‎)+968
  • Pakistan (‫پاکستان‬‎)+92
  • Palau+680
  • Palestine (‫فلسطين‬‎)+970
  • Panama (Panamá)+507
  • Papua New Guinea+675
  • Paraguay+595
  • Peru (Perú)+51
  • Philippines+63
  • Poland (Polska)+48
  • Portugal+351
  • Puerto Rico+1
  • Qatar (‫قطر‬‎)+974
  • Réunion (La Réunion)+262
  • Romania (România)+40
  • Russia (Россия)+7
  • Rwanda+250
  • Saint Barthélemy (Saint-Barthélemy)+590
  • Saint Helena+290
  • Saint Kitts and Nevis+1869
  • Saint Lucia+1758
  • Saint Martin (Saint-Martin (partie française))+590
  • Saint Pierre and Miquelon (Saint-Pierre-et-Miquelon)+508
  • Saint Vincent and the Grenadines+1784
  • Samoa+685
  • San Marino+378
  • São Tomé and Príncipe (São Tomé e Príncipe)+239
  • Saudi Arabia (‫المملكة العربية السعودية‬‎)+966
  • Senegal (Sénégal)+221
  • Serbia (Србија)+381
  • Seychelles+248
  • Sierra Leone+232
  • Singapore+65
  • Sint Maarten+1721
  • Slovakia (Slovensko)+421
  • Slovenia (Slovenija)+386
  • Solomon Islands+677
  • Somalia (Soomaaliya)+252
  • South Africa+27
  • South Korea (대한민국)+82
  • South Sudan (‫جنوب السودان‬‎)+211
  • Spain (España)+34
  • Sri Lanka (ශ්‍රී ලංකාව)+94
  • Sudan (‫السودان‬‎)+249
  • Suriname+597
  • Svalbard and Jan Mayen+47
  • Swaziland+268
  • Sweden (Sverige)+46
  • Switzerland (Schweiz)+41
  • Syria (‫سوريا‬‎)+963
  • Taiwan (台灣)+886
  • Tajikistan+992
  • Tanzania+255
  • Thailand (ไทย)+66
  • Timor-Leste+670
  • Togo+228
  • Tokelau+690
  • Tonga+676
  • Trinidad and Tobago+1868
  • Tunisia (‫تونس‬‎)+216
  • Turkey (Türkiye)+90
  • Turkmenistan+993
  • Turks and Caicos Islands+1649
  • Tuvalu+688
  • U.S. Virgin Islands+1340
  • Uganda+256
  • Ukraine (Україна)+380
  • United Arab Emirates (‫الإمارات العربية المتحدة‬‎)+971
  • United Kingdom+44
  • United States+1
  • Uruguay+598
  • Uzbekistan (Oʻzbekiston)+998
  • Vanuatu+678
  • Vatican City (Città del Vaticano)+39
  • Venezuela+58
  • Vietnam (Việt Nam)+84
  • Wallis and Futuna+681
  • Western Sahara (‫الصحراء الغربية‬‎)+212
  • Yemen (‫اليمن‬‎)+967
  • Zambia+260
  • Zimbabwe+263
  • Åland Islands+358
Thanks! We'll be in touch in the next 12 hours
Oops! Something went wrong while submitting the form.

Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

Aniket Rajput

Data Engineering

The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

Reading / Writing data from/to Database using Spark:

To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

Step 1: Register Driver or Use Connector

Get the respective driver of your database and register the driver, or use the connector to connect to the database.

Step 2: Make a connection

Next, the driver or connector makes a connection to the database.

Step 3: Run query statement

Using the connection created in the previous step, execute the query, which will return the result.

Step 4: Process result

For the result, we got in the previous step, process it as per your requirement.

Step 5: Close the connection

Dataset we are using:

Covid data

This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

The following image shows ten records of the entire dataset.

Single-partition Spark program:

## Creating a spark session and adding Postgres Driver to spark.
from pyspark.sql import SparkSession
## Creating spark session and adding Postgres Driver to spark.
spark_session = SparkSession.builder \
.master("local") \
.appName("Databases") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")\
.getOrCreate()
hostname = "localhost",
jdbc_port = 5432,
dbname = "aniket",
username = "postgres",
password = "pass@123"
jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
## reading data
table_data_df = spark_session.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "aniket") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.load()
## writing data
table_data_df.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "spark_schema.zipcode_table") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.save()
view raw .py hosted with ❤ by GitHub

Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is 'Databases.' This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let's try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

Output of Program:

/usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
confs: [default]
found org.postgresql#postgresql;42.2.8 in central
:: resolution report :: resolve 113ms :: artifacts dl 3ms
:: modules in use:
org.postgresql#postgresql;42.2.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/5ms)
22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+-------------+-----------------+---------+---------+--------+-----+------+----------+
| state| district|confirmed|recovered|deceased|other|tested| date|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
|Uttar Pradesh| Varanasi| 23512| 23010| 456| 0|595510|2021-02-24|
| Uttarakhand| Almora| 3259| 3081| 25| 127| 84443|2021-02-24|
| Uttarakhand| Bageshwar| 1534| 1488| 17| 26| 55626|2021-02-24|
| Uttarakhand| Chamoli| 3486| 3373| 15| 88| 90390|2021-02-24|
| Uttarakhand| Champawat| 1819| 1790| 9| 7| 95068|2021-02-24|
| Uttarakhand| Dehradun| 29619| 28152| 962| 439|401496|2021-02-24|
| Uttarakhand| Haridwar| 14137| 13697| 158| 175|369542|2021-02-24|
| Uttarakhand| Nainital| 12636| 12254| 237| 79|204422|2021-02-24|
| Uttarakhand| Pauri Garhwal| 5145| 5033| 60| 24|138878|2021-02-24|
| Uttarakhand| Pithoragarh| 3361| 3291| 47| 11| 72686|2021-02-24|
| Uttarakhand| Rudraprayag| 2270| 2251| 10| 7| 52378|2021-02-24|
| Uttarakhand| Tehri Garhwal| 4227| 4026| 16| 170|105111|2021-02-24|
| Uttarakhand|Udham Singh Nagar| 11538| 11267| 117| 123|337292|2021-02-24|
| Uttarakhand| Uttarkashi| 3789| 3645| 17| 118|120026|2021-02-24|
| West Bengal| Alipurduar| 7705| 7616| 86| 0| null|2021-02-24|
| West Bengal| Bankura| 11940| 11788| 92| 0| null|2021-02-24|
| West Bengal| Birbhum| 10035| 9876| 89| 0| null|2021-02-24|
| West Bengal| Cooch Behar| 11835| 11756| 72| 0| null|2021-02-24|
| West Bengal| Dakshin Dinajpur| 8179| 8099| 74| 0| null|2021-02-24|
| West Bengal| Darjeeling| 18423| 18155| 203| 0| null|2021-02-24|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
only showing top 20 rows
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Multiple Partition spark program:

from pyspark.sql import SparkSession
## Creating a spark session and adding Postgres Driver to spark.
spark_session = SparkSession.builder \
.master("local[4]") \
.appName("Databases") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")\
.getOrCreate()
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
partition_column = 'date'
lower_bound = '2021-02-20'
upper_bound = '2021-02-28'
num_partitions = 4
## reading data
table_data_df = spark_session.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "covid_data") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.option("partitionColumn", partition_column) \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("numPartitions", num_partitions) \
.load()
table_data_df.show()
## writing data
table_data_df.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "covid_data_output") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.option("numPartitions", num_partitions) \
.save()
view raw .py hosted with ❤ by GitHub

As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

Partitions are decided by the Spark API in the following way.

Let’s consider an example where:

lowerBound: 0

upperBound: 1000

numPartitions: 10

Stride is equal to 100, and partitions correspond to the following queries:

SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

...

...

SELECT * FROM table WHERE partitionColumn > 9000

BETWEEN here is exclusive on the upper bound.

Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

Note- lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

     partitionColumn must be a numeric, date, or timestamp column from the table

Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

Output of Program:

/usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
confs: [default]
found org.postgresql#postgresql;42.2.8 in central
:: resolution report :: resolve 104ms :: artifacts dl 3ms
:: modules in use:
org.postgresql#postgresql;42.2.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/4ms)
22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+-------------+-----------------+---------+---------+--------+-----+------+----------+
| state| district|confirmed|recovered|deceased|other|tested| date|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
|Uttar Pradesh| Varanasi| 23512| 23010| 456| 0|595510|2021-02-24|
| Uttarakhand| Almora| 3259| 3081| 25| 127| 84443|2021-02-24|
| Uttarakhand| Bageshwar| 1534| 1488| 17| 26| 55626|2021-02-24|
| Uttarakhand| Chamoli| 3486| 3373| 15| 88| 90390|2021-02-24|
| Uttarakhand| Champawat| 1819| 1790| 9| 7| 95068|2021-02-24|
| Uttarakhand| Dehradun| 29619| 28152| 962| 439|401496|2021-02-24|
| Uttarakhand| Haridwar| 14137| 13697| 158| 175|369542|2021-02-24|
| Uttarakhand| Nainital| 12636| 12254| 237| 79|204422|2021-02-24|
| Uttarakhand| Pauri Garhwal| 5145| 5033| 60| 24|138878|2021-02-24|
| Uttarakhand| Pithoragarh| 3361| 3291| 47| 11| 72686|2021-02-24|
| Uttarakhand| Rudraprayag| 2270| 2251| 10| 7| 52378|2021-02-24|
| Uttarakhand| Tehri Garhwal| 4227| 4026| 16| 170|105111|2021-02-24|
| Uttarakhand|Udham Singh Nagar| 11538| 11267| 117| 123|337292|2021-02-24|
| Uttarakhand| Uttarkashi| 3789| 3645| 17| 118|120026|2021-02-24|
| West Bengal| Alipurduar| 7705| 7616| 86| 0| null|2021-02-24|
| West Bengal| Bankura| 11940| 11788| 92| 0| null|2021-02-24|
| West Bengal| Birbhum| 10035| 9876| 89| 0| null|2021-02-24|
| West Bengal| Cooch Behar| 11835| 11756| 72| 0| null|2021-02-24|
| West Bengal| Dakshin Dinajpur| 8179| 8099| 74| 0| null|2021-02-24|
| West Bengal| Darjeeling| 18423| 18155| 203| 0| null|2021-02-24|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
only showing top 20 rows
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

Single Thread Python Program: 

import traceback
import psycopg2
import pandas as pd
class PostgresDbClient:
def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
self.db_host = postgres_hostname
self.db_port = postgres_jdbcport
self.db_name = postgres_dbname
self.db_user = username
self.db_pass = password
def create_conn(self):
conn = None
try:
print('Connecting to the Postgres database...')
conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
print('Successfully connected to the Postgres database...')
except Exception as e:
print("Cannot connect to Postgres.")
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
return conn
def read(self, query):
try:
conn = self.create_conn()
cursor = conn.cursor()
print(f"Reading data !!!")
cursor.execute(query)
data = cursor.fetchall()
print(f"Read Data !!!")
cursor.close()
conn.close()
return data
except Exception as e:
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
if __name__ == "__main__":
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
table_name = "covid_data"
query = f"select * from {table_name}"
db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
data = pd.DataFrame(db_client.read(query))
print(data)
view raw .py hosted with ❤ by GitHub

To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

Output of Program:

Connecting to the Postgres database...
Successfully connected to the Postgres database...
Reading data !!!
Read Data !!!
0 1 2 3 4 5 6 7
0 Andaman and Nicobar Islands Unknown 33 11 0 0 None 2020-04-26
1 Andhra Pradesh Anantapur 53 14 4 0 None 2020-04-26
2 Andhra Pradesh Chittoor 73 13 0 0 None 2020-04-26
3 Andhra Pradesh East Godavari 39 12 0 0 None 2020-04-26
4 Andhra Pradesh Guntur 214 29 8 0 None 2020-04-26
.. ... ... ... .. .. .. ... ...
95 Bihar Araria 1 0 0 0 None 2020-04-30
96 Bihar Arwal 4 0 0 0 None 2020-04-30
97 Bihar Aurangabad 8 0 0 0 None 2020-04-30
98 Bihar Banka 3 0 0 0 None 2020-04-30
99 Bihar Begusarai 11 5 0 0 None 2020-04-30
[100 rows x 8 columns]
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Multi Thread python program:

In the previous section, we discussed the drawback of a single process and single-thread implementation. Let's get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

What is a process?

When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

What is a thread?

A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

What is multithreading?

This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

What is multiprocessing?

When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

Let's get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don't get confused if you don't understand it, as it is just how we write the code.

To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

def get_thread(thread_id, db_client, query):

    print(f"Starting thread id {thread_id}")

    data = pd.DataFrame(db_client.read(query))

    print(f"Thread {thread_id} data ", data, sep="\n")

    db_client.write(data)

    print(f"Stopping thread id {thread_id}")

Python Program:

import threading
import traceback
import psycopg2
import pandas as pd
class PostgresDbClient:
def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
self.db_host = postgres_hostname
self.db_port = postgres_jdbcport
self.db_name = postgres_dbname
self.db_user = username
self.db_pass = password
def create_conn(self):
conn = None
try:
print('Connecting to the Postgres database...')
conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
print('Successfully connected to the Postgres database...')
except Exception as e:
print("Cannot connect to Postgres.")
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
return conn
def read(self, query):
try:
conn = self.create_conn()
cursor = conn.cursor()
print(f"Reading data !!!")
cursor.execute(query)
data = cursor.fetchall()
print(f"Read Data !!!")
cursor.close()
conn.close()
return data
except Exception as e:
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
def get_thread(thread_id, db_client, query):
print(f"Starting thread id {thread_id}")
data = pd.DataFrame(db_client.read(query))
print(f"Thread {thread_id} data ", data, sep="\n")
print(f"Stopping thread id {thread_id}")
if __name__ == "__main__":
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
table_name = "covid_data"
query = f"select * from {table_name}"
partition_column = 'date'
lower_bound = '2020-04-26'
upper_bound = '2020-04-30'
num_partitions = 5
query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
x1.start()
x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
x2.start()
x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
x3.start()
x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
x4.start()
x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
x5.start()
view raw .py hosted with ❤ by GitHub

Output of Program:

Starting thread id 1
Connecting to the Postgres database...
Starting thread id 2
Connecting to the Postgres database...
Starting thread id 3
Connecting to the Postgres database...
Starting thread id 4
Connecting to the Postgres database...
Starting thread id 5
Connecting to the Postgres database...
Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
Reading data !!!
Reading data !!!
Reading data !!!
Successfully connected to the Postgres database...
Reading data !!!
Successfully connected to the Postgres database...
Reading data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Thread 2 data
Thread 3 data
Thread 1 data
Thread 5 data
Thread 4 data
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-27
1 Andhra Pradesh Anantapur 53 ... 0 None 2020-04-27
2 Andhra Pradesh Chittoor 73 ... 0 None 2020-04-27
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-27
4 Andhra Pradesh Guntur 237 ... 0 None 2020-04-27
5 Andhra Pradesh Krishna 210 ... 0 None 2020-04-27
6 Andhra Pradesh Kurnool 292 ... 0 None 2020-04-27
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-27
8 Andhra Pradesh S.P.S. Nellore 79 ... 0 None 2020-04-27
9 Andhra Pradesh Srikakulam 4 ... 0 None 2020-04-27
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-27
11 Andhra Pradesh West Godavari 54 ... 0 None 2020-04-27
12 Andhra Pradesh Y.S.R. Kadapa 58 ... 0 None 2020-04-27
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-27
14 Assam Unknown 36 ... 0 None 2020-04-27
15 Bihar Arwal 4 ... 0 None 2020-04-27
16 Bihar Aurangabad 7 ... 0 None 2020-04-27
17 Bihar Banka 2 ... 0 None 2020-04-27
18 Bihar Begusarai 9 ... 0 None 2020-04-27
19 Bihar Bhagalpur 5 ... 0 None 2020-04-27
[20 rows x 8 columns]
Stopping thread id 2
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-26
1 Andhra Pradesh Anantapur 53 ... 0 None 2020-04-26
2 Andhra Pradesh Chittoor 73 ... 0 None 2020-04-26
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-26
4 Andhra Pradesh Guntur 214 ... 0 None 2020-04-26
5 Andhra Pradesh Krishna 177 ... 0 None 2020-04-26
6 Andhra Pradesh Kurnool 279 ... 0 None 2020-04-26
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-26
8 Andhra Pradesh S.P.S. Nellore 72 ... 0 None 2020-04-26
9 Andhra Pradesh Srikakulam 3 ... 0 None 2020-04-26
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-26
11 Andhra Pradesh West Godavari 51 ... 0 None 2020-04-26
12 Andhra Pradesh Y.S.R. Kadapa 58 ... 0 None 2020-04-26
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-26
14 Assam Unknown 36 ... 0 None 2020-04-26
15 Bihar Arwal 4 ... 0 None 2020-04-26
16 Bihar Aurangabad 2 ... 0 None 2020-04-26
17 Bihar Banka 2 ... 0 None 2020-04-26
18 Bihar Begusarai 9 ... 0 None 2020-04-26
19 Bihar Bhagalpur 5 ... 0 None 2020-04-26
[20 rows x 8 columns]
Stopping thread id 1
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-28
1 Andhra Pradesh Anantapur 54 ... 0 None 2020-04-28
2 Andhra Pradesh Chittoor 74 ... 0 None 2020-04-28
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-28
4 Andhra Pradesh Guntur 254 ... 0 None 2020-04-28
5 Andhra Pradesh Krishna 223 ... 0 None 2020-04-28
6 Andhra Pradesh Kurnool 332 ... 0 None 2020-04-28
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-28
8 Andhra Pradesh S.P.S. Nellore 82 ... 0 None 2020-04-28
9 Andhra Pradesh Srikakulam 4 ... 0 None 2020-04-28
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-28
11 Andhra Pradesh West Godavari 54 ... 0 None 2020-04-28
12 Andhra Pradesh Y.S.R. Kadapa 65 ... 0 None 2020-04-28
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-28
14 Assam Unknown 38 ... 0 None 2020-04-28
15 Bihar Araria 1 ... 0 None 2020-04-28
16 Bihar Arwal 4 ... 0 None 2020-04-28
17 Bihar Aurangabad 7 ... 0 None 2020-04-28
18 Bihar Banka 3 ... 0 None 2020-04-28
19 Bihar Begusarai 9 ... 0 None 2020-04-28
[20 rows x 8 columns]
Stopping thread id 3
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-30
1 Andhra Pradesh Anantapur 61 ... 0 None 2020-04-30
2 Andhra Pradesh Chittoor 80 ... 0 None 2020-04-30
3 Andhra Pradesh East Godavari 42 ... 0 None 2020-04-30
4 Andhra Pradesh Guntur 287 ... 0 None 2020-04-30
5 Andhra Pradesh Krishna 246 ... 0 None 2020-04-30
6 Andhra Pradesh Kurnool 386 ... 0 None 2020-04-30
7 Andhra Pradesh Prakasam 60 ... 0 None 2020-04-30
8 Andhra Pradesh S.P.S. Nellore 84 ... 0 None 2020-04-30
9 Andhra Pradesh Srikakulam 5 ... 0 None 2020-04-30
10 Andhra Pradesh Visakhapatnam 23 ... 0 None 2020-04-30
11 Andhra Pradesh West Godavari 56 ... 0 None 2020-04-30
12 Andhra Pradesh Y.S.R. Kadapa 73 ... 0 None 2020-04-30
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-30
14 Assam Unknown 43 ... 0 None 2020-04-30
15 Bihar Araria 1 ... 0 None 2020-04-30
16 Bihar Arwal 4 ... 0 None 2020-04-30
17 Bihar Aurangabad 8 ... 0 None 2020-04-30
18 Bihar Banka 3 ... 0 None 2020-04-30
19 Bihar Begusarai 11 ... 0 None 2020-04-30
[20 rows x 8 columns]
Stopping thread id 5
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-29
1 Andhra Pradesh Anantapur 58 ... 0 None 2020-04-29
2 Andhra Pradesh Chittoor 77 ... 0 None 2020-04-29
3 Andhra Pradesh East Godavari 40 ... 0 None 2020-04-29
4 Andhra Pradesh Guntur 283 ... 0 None 2020-04-29
5 Andhra Pradesh Krishna 236 ... 0 None 2020-04-29
6 Andhra Pradesh Kurnool 343 ... 0 None 2020-04-29
7 Andhra Pradesh Prakasam 60 ... 0 None 2020-04-29
8 Andhra Pradesh S.P.S. Nellore 82 ... 0 None 2020-04-29
9 Andhra Pradesh Srikakulam 5 ... 0 None 2020-04-29
10 Andhra Pradesh Visakhapatnam 23 ... 0 None 2020-04-29
11 Andhra Pradesh West Godavari 56 ... 0 None 2020-04-29
12 Andhra Pradesh Y.S.R. Kadapa 69 ... 0 None 2020-04-29
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-29
14 Assam Unknown 38 ... 0 None 2020-04-29
15 Bihar Araria 1 ... 0 None 2020-04-29
16 Bihar Arwal 4 ... 0 None 2020-04-29
17 Bihar Aurangabad 8 ... 0 None 2020-04-29
18 Bihar Banka 3 ... 0 None 2020-04-29
19 Bihar Begusarai 11 ... 0 None 2020-04-29
[20 rows x 8 columns]
Stopping thread id 4
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

Conclusion 

After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it's just mostly personal preference, and using which approach depends on your requirements and availability of resources.

Get the latest engineering blogs delivered straight to your inbox.
No spam. Only expert insights.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings

Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

Reading / Writing data from/to Database using Spark:

To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

Step 1: Register Driver or Use Connector

Get the respective driver of your database and register the driver, or use the connector to connect to the database.

Step 2: Make a connection

Next, the driver or connector makes a connection to the database.

Step 3: Run query statement

Using the connection created in the previous step, execute the query, which will return the result.

Step 4: Process result

For the result, we got in the previous step, process it as per your requirement.

Step 5: Close the connection

Dataset we are using:

Covid data

This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

The following image shows ten records of the entire dataset.

Single-partition Spark program:

## Creating a spark session and adding Postgres Driver to spark.
from pyspark.sql import SparkSession
## Creating spark session and adding Postgres Driver to spark.
spark_session = SparkSession.builder \
.master("local") \
.appName("Databases") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")\
.getOrCreate()
hostname = "localhost",
jdbc_port = 5432,
dbname = "aniket",
username = "postgres",
password = "pass@123"
jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
## reading data
table_data_df = spark_session.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "aniket") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.load()
## writing data
table_data_df.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "spark_schema.zipcode_table") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.save()
view raw .py hosted with ❤ by GitHub

Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is 'Databases.' This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let's try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

Output of Program:

/usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
confs: [default]
found org.postgresql#postgresql;42.2.8 in central
:: resolution report :: resolve 113ms :: artifacts dl 3ms
:: modules in use:
org.postgresql#postgresql;42.2.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/5ms)
22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+-------------+-----------------+---------+---------+--------+-----+------+----------+
| state| district|confirmed|recovered|deceased|other|tested| date|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
|Uttar Pradesh| Varanasi| 23512| 23010| 456| 0|595510|2021-02-24|
| Uttarakhand| Almora| 3259| 3081| 25| 127| 84443|2021-02-24|
| Uttarakhand| Bageshwar| 1534| 1488| 17| 26| 55626|2021-02-24|
| Uttarakhand| Chamoli| 3486| 3373| 15| 88| 90390|2021-02-24|
| Uttarakhand| Champawat| 1819| 1790| 9| 7| 95068|2021-02-24|
| Uttarakhand| Dehradun| 29619| 28152| 962| 439|401496|2021-02-24|
| Uttarakhand| Haridwar| 14137| 13697| 158| 175|369542|2021-02-24|
| Uttarakhand| Nainital| 12636| 12254| 237| 79|204422|2021-02-24|
| Uttarakhand| Pauri Garhwal| 5145| 5033| 60| 24|138878|2021-02-24|
| Uttarakhand| Pithoragarh| 3361| 3291| 47| 11| 72686|2021-02-24|
| Uttarakhand| Rudraprayag| 2270| 2251| 10| 7| 52378|2021-02-24|
| Uttarakhand| Tehri Garhwal| 4227| 4026| 16| 170|105111|2021-02-24|
| Uttarakhand|Udham Singh Nagar| 11538| 11267| 117| 123|337292|2021-02-24|
| Uttarakhand| Uttarkashi| 3789| 3645| 17| 118|120026|2021-02-24|
| West Bengal| Alipurduar| 7705| 7616| 86| 0| null|2021-02-24|
| West Bengal| Bankura| 11940| 11788| 92| 0| null|2021-02-24|
| West Bengal| Birbhum| 10035| 9876| 89| 0| null|2021-02-24|
| West Bengal| Cooch Behar| 11835| 11756| 72| 0| null|2021-02-24|
| West Bengal| Dakshin Dinajpur| 8179| 8099| 74| 0| null|2021-02-24|
| West Bengal| Darjeeling| 18423| 18155| 203| 0| null|2021-02-24|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
only showing top 20 rows
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Multiple Partition spark program:

from pyspark.sql import SparkSession
## Creating a spark session and adding Postgres Driver to spark.
spark_session = SparkSession.builder \
.master("local[4]") \
.appName("Databases") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")\
.getOrCreate()
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
partition_column = 'date'
lower_bound = '2021-02-20'
upper_bound = '2021-02-28'
num_partitions = 4
## reading data
table_data_df = spark_session.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "covid_data") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.option("partitionColumn", partition_column) \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("numPartitions", num_partitions) \
.load()
table_data_df.show()
## writing data
table_data_df.write \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "covid_data_output") \
.option("user", username) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.option("numPartitions", num_partitions) \
.save()
view raw .py hosted with ❤ by GitHub

As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

Partitions are decided by the Spark API in the following way.

Let’s consider an example where:

lowerBound: 0

upperBound: 1000

numPartitions: 10

Stride is equal to 100, and partitions correspond to the following queries:

SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

...

...

SELECT * FROM table WHERE partitionColumn > 9000

BETWEEN here is exclusive on the upper bound.

Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

Note- lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

     partitionColumn must be a numeric, date, or timestamp column from the table

Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

Output of Program:

/usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
:: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
confs: [default]
found org.postgresql#postgresql;42.2.8 in central
:: resolution report :: resolve 104ms :: artifacts dl 3ms
:: modules in use:
org.postgresql#postgresql;42.2.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/4ms)
22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+-------------+-----------------+---------+---------+--------+-----+------+----------+
| state| district|confirmed|recovered|deceased|other|tested| date|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
|Uttar Pradesh| Varanasi| 23512| 23010| 456| 0|595510|2021-02-24|
| Uttarakhand| Almora| 3259| 3081| 25| 127| 84443|2021-02-24|
| Uttarakhand| Bageshwar| 1534| 1488| 17| 26| 55626|2021-02-24|
| Uttarakhand| Chamoli| 3486| 3373| 15| 88| 90390|2021-02-24|
| Uttarakhand| Champawat| 1819| 1790| 9| 7| 95068|2021-02-24|
| Uttarakhand| Dehradun| 29619| 28152| 962| 439|401496|2021-02-24|
| Uttarakhand| Haridwar| 14137| 13697| 158| 175|369542|2021-02-24|
| Uttarakhand| Nainital| 12636| 12254| 237| 79|204422|2021-02-24|
| Uttarakhand| Pauri Garhwal| 5145| 5033| 60| 24|138878|2021-02-24|
| Uttarakhand| Pithoragarh| 3361| 3291| 47| 11| 72686|2021-02-24|
| Uttarakhand| Rudraprayag| 2270| 2251| 10| 7| 52378|2021-02-24|
| Uttarakhand| Tehri Garhwal| 4227| 4026| 16| 170|105111|2021-02-24|
| Uttarakhand|Udham Singh Nagar| 11538| 11267| 117| 123|337292|2021-02-24|
| Uttarakhand| Uttarkashi| 3789| 3645| 17| 118|120026|2021-02-24|
| West Bengal| Alipurduar| 7705| 7616| 86| 0| null|2021-02-24|
| West Bengal| Bankura| 11940| 11788| 92| 0| null|2021-02-24|
| West Bengal| Birbhum| 10035| 9876| 89| 0| null|2021-02-24|
| West Bengal| Cooch Behar| 11835| 11756| 72| 0| null|2021-02-24|
| West Bengal| Dakshin Dinajpur| 8179| 8099| 74| 0| null|2021-02-24|
| West Bengal| Darjeeling| 18423| 18155| 203| 0| null|2021-02-24|
+-------------+-----------------+---------+---------+--------+-----+------+----------+
only showing top 20 rows
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

Single Thread Python Program: 

import traceback
import psycopg2
import pandas as pd
class PostgresDbClient:
def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
self.db_host = postgres_hostname
self.db_port = postgres_jdbcport
self.db_name = postgres_dbname
self.db_user = username
self.db_pass = password
def create_conn(self):
conn = None
try:
print('Connecting to the Postgres database...')
conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
print('Successfully connected to the Postgres database...')
except Exception as e:
print("Cannot connect to Postgres.")
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
return conn
def read(self, query):
try:
conn = self.create_conn()
cursor = conn.cursor()
print(f"Reading data !!!")
cursor.execute(query)
data = cursor.fetchall()
print(f"Read Data !!!")
cursor.close()
conn.close()
return data
except Exception as e:
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
if __name__ == "__main__":
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
table_name = "covid_data"
query = f"select * from {table_name}"
db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
data = pd.DataFrame(db_client.read(query))
print(data)
view raw .py hosted with ❤ by GitHub

To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

Output of Program:

Connecting to the Postgres database...
Successfully connected to the Postgres database...
Reading data !!!
Read Data !!!
0 1 2 3 4 5 6 7
0 Andaman and Nicobar Islands Unknown 33 11 0 0 None 2020-04-26
1 Andhra Pradesh Anantapur 53 14 4 0 None 2020-04-26
2 Andhra Pradesh Chittoor 73 13 0 0 None 2020-04-26
3 Andhra Pradesh East Godavari 39 12 0 0 None 2020-04-26
4 Andhra Pradesh Guntur 214 29 8 0 None 2020-04-26
.. ... ... ... .. .. .. ... ...
95 Bihar Araria 1 0 0 0 None 2020-04-30
96 Bihar Arwal 4 0 0 0 None 2020-04-30
97 Bihar Aurangabad 8 0 0 0 None 2020-04-30
98 Bihar Banka 3 0 0 0 None 2020-04-30
99 Bihar Begusarai 11 5 0 0 None 2020-04-30
[100 rows x 8 columns]
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Multi Thread python program:

In the previous section, we discussed the drawback of a single process and single-thread implementation. Let's get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

What is a process?

When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

What is a thread?

A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

What is multithreading?

This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

What is multiprocessing?

When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

Let's get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don't get confused if you don't understand it, as it is just how we write the code.

To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

def get_thread(thread_id, db_client, query):

    print(f"Starting thread id {thread_id}")

    data = pd.DataFrame(db_client.read(query))

    print(f"Thread {thread_id} data ", data, sep="\n")

    db_client.write(data)

    print(f"Stopping thread id {thread_id}")

Python Program:

import threading
import traceback
import psycopg2
import pandas as pd
class PostgresDbClient:
def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
self.db_host = postgres_hostname
self.db_port = postgres_jdbcport
self.db_name = postgres_dbname
self.db_user = username
self.db_pass = password
def create_conn(self):
conn = None
try:
print('Connecting to the Postgres database...')
conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
print('Successfully connected to the Postgres database...')
except Exception as e:
print("Cannot connect to Postgres.")
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
return conn
def read(self, query):
try:
conn = self.create_conn()
cursor = conn.cursor()
print(f"Reading data !!!")
cursor.execute(query)
data = cursor.fetchall()
print(f"Read Data !!!")
cursor.close()
conn.close()
return data
except Exception as e:
print(f'Error: {str(e)}\nTrace: {traceback.format_exc()}')
def get_thread(thread_id, db_client, query):
print(f"Starting thread id {thread_id}")
data = pd.DataFrame(db_client.read(query))
print(f"Thread {thread_id} data ", data, sep="\n")
print(f"Stopping thread id {thread_id}")
if __name__ == "__main__":
hostname = "localhost"
jdbc_port = 5432
dbname = "postgres"
username = "postgres"
password = "pass@123"
table_name = "covid_data"
query = f"select * from {table_name}"
partition_column = 'date'
lower_bound = '2020-04-26'
upper_bound = '2020-04-30'
num_partitions = 5
query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
x1.start()
x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
x2.start()
x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
x3.start()
x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
x4.start()
x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
x5.start()
view raw .py hosted with ❤ by GitHub

Output of Program:

Starting thread id 1
Connecting to the Postgres database...
Starting thread id 2
Connecting to the Postgres database...
Starting thread id 3
Connecting to the Postgres database...
Starting thread id 4
Connecting to the Postgres database...
Starting thread id 5
Connecting to the Postgres database...
Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
Reading data !!!
Reading data !!!
Reading data !!!
Successfully connected to the Postgres database...
Reading data !!!
Successfully connected to the Postgres database...
Reading data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Read Data !!!
Thread 2 data
Thread 3 data
Thread 1 data
Thread 5 data
Thread 4 data
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-27
1 Andhra Pradesh Anantapur 53 ... 0 None 2020-04-27
2 Andhra Pradesh Chittoor 73 ... 0 None 2020-04-27
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-27
4 Andhra Pradesh Guntur 237 ... 0 None 2020-04-27
5 Andhra Pradesh Krishna 210 ... 0 None 2020-04-27
6 Andhra Pradesh Kurnool 292 ... 0 None 2020-04-27
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-27
8 Andhra Pradesh S.P.S. Nellore 79 ... 0 None 2020-04-27
9 Andhra Pradesh Srikakulam 4 ... 0 None 2020-04-27
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-27
11 Andhra Pradesh West Godavari 54 ... 0 None 2020-04-27
12 Andhra Pradesh Y.S.R. Kadapa 58 ... 0 None 2020-04-27
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-27
14 Assam Unknown 36 ... 0 None 2020-04-27
15 Bihar Arwal 4 ... 0 None 2020-04-27
16 Bihar Aurangabad 7 ... 0 None 2020-04-27
17 Bihar Banka 2 ... 0 None 2020-04-27
18 Bihar Begusarai 9 ... 0 None 2020-04-27
19 Bihar Bhagalpur 5 ... 0 None 2020-04-27
[20 rows x 8 columns]
Stopping thread id 2
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-26
1 Andhra Pradesh Anantapur 53 ... 0 None 2020-04-26
2 Andhra Pradesh Chittoor 73 ... 0 None 2020-04-26
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-26
4 Andhra Pradesh Guntur 214 ... 0 None 2020-04-26
5 Andhra Pradesh Krishna 177 ... 0 None 2020-04-26
6 Andhra Pradesh Kurnool 279 ... 0 None 2020-04-26
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-26
8 Andhra Pradesh S.P.S. Nellore 72 ... 0 None 2020-04-26
9 Andhra Pradesh Srikakulam 3 ... 0 None 2020-04-26
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-26
11 Andhra Pradesh West Godavari 51 ... 0 None 2020-04-26
12 Andhra Pradesh Y.S.R. Kadapa 58 ... 0 None 2020-04-26
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-26
14 Assam Unknown 36 ... 0 None 2020-04-26
15 Bihar Arwal 4 ... 0 None 2020-04-26
16 Bihar Aurangabad 2 ... 0 None 2020-04-26
17 Bihar Banka 2 ... 0 None 2020-04-26
18 Bihar Begusarai 9 ... 0 None 2020-04-26
19 Bihar Bhagalpur 5 ... 0 None 2020-04-26
[20 rows x 8 columns]
Stopping thread id 1
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-28
1 Andhra Pradesh Anantapur 54 ... 0 None 2020-04-28
2 Andhra Pradesh Chittoor 74 ... 0 None 2020-04-28
3 Andhra Pradesh East Godavari 39 ... 0 None 2020-04-28
4 Andhra Pradesh Guntur 254 ... 0 None 2020-04-28
5 Andhra Pradesh Krishna 223 ... 0 None 2020-04-28
6 Andhra Pradesh Kurnool 332 ... 0 None 2020-04-28
7 Andhra Pradesh Prakasam 56 ... 0 None 2020-04-28
8 Andhra Pradesh S.P.S. Nellore 82 ... 0 None 2020-04-28
9 Andhra Pradesh Srikakulam 4 ... 0 None 2020-04-28
10 Andhra Pradesh Visakhapatnam 22 ... 0 None 2020-04-28
11 Andhra Pradesh West Godavari 54 ... 0 None 2020-04-28
12 Andhra Pradesh Y.S.R. Kadapa 65 ... 0 None 2020-04-28
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-28
14 Assam Unknown 38 ... 0 None 2020-04-28
15 Bihar Araria 1 ... 0 None 2020-04-28
16 Bihar Arwal 4 ... 0 None 2020-04-28
17 Bihar Aurangabad 7 ... 0 None 2020-04-28
18 Bihar Banka 3 ... 0 None 2020-04-28
19 Bihar Begusarai 9 ... 0 None 2020-04-28
[20 rows x 8 columns]
Stopping thread id 3
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-30
1 Andhra Pradesh Anantapur 61 ... 0 None 2020-04-30
2 Andhra Pradesh Chittoor 80 ... 0 None 2020-04-30
3 Andhra Pradesh East Godavari 42 ... 0 None 2020-04-30
4 Andhra Pradesh Guntur 287 ... 0 None 2020-04-30
5 Andhra Pradesh Krishna 246 ... 0 None 2020-04-30
6 Andhra Pradesh Kurnool 386 ... 0 None 2020-04-30
7 Andhra Pradesh Prakasam 60 ... 0 None 2020-04-30
8 Andhra Pradesh S.P.S. Nellore 84 ... 0 None 2020-04-30
9 Andhra Pradesh Srikakulam 5 ... 0 None 2020-04-30
10 Andhra Pradesh Visakhapatnam 23 ... 0 None 2020-04-30
11 Andhra Pradesh West Godavari 56 ... 0 None 2020-04-30
12 Andhra Pradesh Y.S.R. Kadapa 73 ... 0 None 2020-04-30
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-30
14 Assam Unknown 43 ... 0 None 2020-04-30
15 Bihar Araria 1 ... 0 None 2020-04-30
16 Bihar Arwal 4 ... 0 None 2020-04-30
17 Bihar Aurangabad 8 ... 0 None 2020-04-30
18 Bihar Banka 3 ... 0 None 2020-04-30
19 Bihar Begusarai 11 ... 0 None 2020-04-30
[20 rows x 8 columns]
Stopping thread id 5
0 1 2 ... 5 6 7
0 Andaman and Nicobar Islands Unknown 33 ... 0 None 2020-04-29
1 Andhra Pradesh Anantapur 58 ... 0 None 2020-04-29
2 Andhra Pradesh Chittoor 77 ... 0 None 2020-04-29
3 Andhra Pradesh East Godavari 40 ... 0 None 2020-04-29
4 Andhra Pradesh Guntur 283 ... 0 None 2020-04-29
5 Andhra Pradesh Krishna 236 ... 0 None 2020-04-29
6 Andhra Pradesh Kurnool 343 ... 0 None 2020-04-29
7 Andhra Pradesh Prakasam 60 ... 0 None 2020-04-29
8 Andhra Pradesh S.P.S. Nellore 82 ... 0 None 2020-04-29
9 Andhra Pradesh Srikakulam 5 ... 0 None 2020-04-29
10 Andhra Pradesh Visakhapatnam 23 ... 0 None 2020-04-29
11 Andhra Pradesh West Godavari 56 ... 0 None 2020-04-29
12 Andhra Pradesh Y.S.R. Kadapa 69 ... 0 None 2020-04-29
13 Arunachal Pradesh Lohit 1 ... 0 None 2020-04-29
14 Assam Unknown 38 ... 0 None 2020-04-29
15 Bihar Araria 1 ... 0 None 2020-04-29
16 Bihar Arwal 4 ... 0 None 2020-04-29
17 Bihar Aurangabad 8 ... 0 None 2020-04-29
18 Bihar Banka 3 ... 0 None 2020-04-29
19 Bihar Begusarai 11 ... 0 None 2020-04-29
[20 rows x 8 columns]
Stopping thread id 4
Process finished with exit code 0
view raw .txt hosted with ❤ by GitHub

Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

Conclusion 

After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it's just mostly personal preference, and using which approach depends on your requirements and availability of resources.