• United States+1
  • United Kingdom+44
  • Afghanistan (‫افغانستان‬‎)+93
  • Albania (Shqipëri)+355
  • Algeria (‫الجزائر‬‎)+213
  • American Samoa+1684
  • Andorra+376
  • Angola+244
  • Anguilla+1264
  • Antigua and Barbuda+1268
  • Argentina+54
  • Armenia (Հայաստան)+374
  • Aruba+297
  • Australia+61
  • Austria (Österreich)+43
  • Azerbaijan (Azərbaycan)+994
  • Bahamas+1242
  • Bahrain (‫البحرين‬‎)+973
  • Bangladesh (বাংলাদেশ)+880
  • Barbados+1246
  • Belarus (Беларусь)+375
  • Belgium (België)+32
  • Belize+501
  • Benin (Bénin)+229
  • Bermuda+1441
  • Bhutan (འབྲུག)+975
  • Bolivia+591
  • Bosnia and Herzegovina (Босна и Херцеговина)+387
  • Botswana+267
  • Brazil (Brasil)+55
  • British Indian Ocean Territory+246
  • British Virgin Islands+1284
  • Brunei+673
  • Bulgaria (България)+359
  • Burkina Faso+226
  • Burundi (Uburundi)+257
  • Cambodia (កម្ពុជា)+855
  • Cameroon (Cameroun)+237
  • Canada+1
  • Cape Verde (Kabu Verdi)+238
  • Caribbean Netherlands+599
  • Cayman Islands+1345
  • Central African Republic (République centrafricaine)+236
  • Chad (Tchad)+235
  • Chile+56
  • China (中国)+86
  • Christmas Island+61
  • Cocos (Keeling) Islands+61
  • Colombia+57
  • Comoros (‫جزر القمر‬‎)+269
  • Congo (DRC) (Jamhuri ya Kidemokrasia ya Kongo)+243
  • Congo (Republic) (Congo-Brazzaville)+242
  • Cook Islands+682
  • Costa Rica+506
  • Côte d’Ivoire+225
  • Croatia (Hrvatska)+385
  • Cuba+53
  • Curaçao+599
  • Cyprus (Κύπρος)+357
  • Czech Republic (Česká republika)+420
  • Denmark (Danmark)+45
  • Djibouti+253
  • Dominica+1767
  • Dominican Republic (República Dominicana)+1
  • Ecuador+593
  • Egypt (‫مصر‬‎)+20
  • El Salvador+503
  • Equatorial Guinea (Guinea Ecuatorial)+240
  • Eritrea+291
  • Estonia (Eesti)+372
  • Ethiopia+251
  • Falkland Islands (Islas Malvinas)+500
  • Faroe Islands (Føroyar)+298
  • Fiji+679
  • Finland (Suomi)+358
  • France+33
  • French Guiana (Guyane française)+594
  • French Polynesia (Polynésie française)+689
  • Gabon+241
  • Gambia+220
  • Georgia (საქართველო)+995
  • Germany (Deutschland)+49
  • Ghana (Gaana)+233
  • Gibraltar+350
  • Greece (Ελλάδα)+30
  • Greenland (Kalaallit Nunaat)+299
  • Grenada+1473
  • Guadeloupe+590
  • Guam+1671
  • Guatemala+502
  • Guernsey+44
  • Guinea (Guinée)+224
  • Guinea-Bissau (Guiné Bissau)+245
  • Guyana+592
  • Haiti+509
  • Honduras+504
  • Hong Kong (香港)+852
  • Hungary (Magyarország)+36
  • Iceland (Ísland)+354
  • India (भारत)+91
  • Indonesia+62
  • Iran (‫ایران‬‎)+98
  • Iraq (‫العراق‬‎)+964
  • Ireland+353
  • Isle of Man+44
  • Israel (‫ישראל‬‎)+972
  • Italy (Italia)+39
  • Jamaica+1876
  • Japan (日本)+81
  • Jersey+44
  • Jordan (‫الأردن‬‎)+962
  • Kazakhstan (Казахстан)+7
  • Kenya+254
  • Kiribati+686
  • Kosovo+383
  • Kuwait (‫الكويت‬‎)+965
  • Kyrgyzstan (Кыргызстан)+996
  • Laos (ລາວ)+856
  • Latvia (Latvija)+371
  • Lebanon (‫لبنان‬‎)+961
  • Lesotho+266
  • Liberia+231
  • Libya (‫ليبيا‬‎)+218
  • Liechtenstein+423
  • Lithuania (Lietuva)+370
  • Luxembourg+352
  • Macau (澳門)+853
  • Macedonia (FYROM) (Македонија)+389
  • Madagascar (Madagasikara)+261
  • Malawi+265
  • Malaysia+60
  • Maldives+960
  • Mali+223
  • Malta+356
  • Marshall Islands+692
  • Martinique+596
  • Mauritania (‫موريتانيا‬‎)+222
  • Mauritius (Moris)+230
  • Mayotte+262
  • Mexico (México)+52
  • Micronesia+691
  • Moldova (Republica Moldova)+373
  • Monaco+377
  • Mongolia (Монгол)+976
  • Montenegro (Crna Gora)+382
  • Montserrat+1664
  • Morocco (‫المغرب‬‎)+212
  • Mozambique (Moçambique)+258
  • Myanmar (Burma) (မြန်မာ)+95
  • Namibia (Namibië)+264
  • Nauru+674
  • Nepal (नेपाल)+977
  • Netherlands (Nederland)+31
  • New Caledonia (Nouvelle-Calédonie)+687
  • New Zealand+64
  • Nicaragua+505
  • Niger (Nijar)+227
  • Nigeria+234
  • Niue+683
  • Norfolk Island+672
  • North Korea (조선 민주주의 인민 공화국)+850
  • Northern Mariana Islands+1670
  • Norway (Norge)+47
  • Oman (‫عُمان‬‎)+968
  • Pakistan (‫پاکستان‬‎)+92
  • Palau+680
  • Palestine (‫فلسطين‬‎)+970
  • Panama (Panamá)+507
  • Papua New Guinea+675
  • Paraguay+595
  • Peru (Perú)+51
  • Philippines+63
  • Poland (Polska)+48
  • Portugal+351
  • Puerto Rico+1
  • Qatar (‫قطر‬‎)+974
  • Réunion (La Réunion)+262
  • Romania (România)+40
  • Russia (Россия)+7
  • Rwanda+250
  • Saint Barthélemy (Saint-Barthélemy)+590
  • Saint Helena+290
  • Saint Kitts and Nevis+1869
  • Saint Lucia+1758
  • Saint Martin (Saint-Martin (partie française))+590
  • Saint Pierre and Miquelon (Saint-Pierre-et-Miquelon)+508
  • Saint Vincent and the Grenadines+1784
  • Samoa+685
  • San Marino+378
  • São Tomé and Príncipe (São Tomé e Príncipe)+239
  • Saudi Arabia (‫المملكة العربية السعودية‬‎)+966
  • Senegal (Sénégal)+221
  • Serbia (Србија)+381
  • Seychelles+248
  • Sierra Leone+232
  • Singapore+65
  • Sint Maarten+1721
  • Slovakia (Slovensko)+421
  • Slovenia (Slovenija)+386
  • Solomon Islands+677
  • Somalia (Soomaaliya)+252
  • South Africa+27
  • South Korea (대한민국)+82
  • South Sudan (‫جنوب السودان‬‎)+211
  • Spain (España)+34
  • Sri Lanka (ශ්‍රී ලංකාව)+94
  • Sudan (‫السودان‬‎)+249
  • Suriname+597
  • Svalbard and Jan Mayen+47
  • Swaziland+268
  • Sweden (Sverige)+46
  • Switzerland (Schweiz)+41
  • Syria (‫سوريا‬‎)+963
  • Taiwan (台灣)+886
  • Tajikistan+992
  • Tanzania+255
  • Thailand (ไทย)+66
  • Timor-Leste+670
  • Togo+228
  • Tokelau+690
  • Tonga+676
  • Trinidad and Tobago+1868
  • Tunisia (‫تونس‬‎)+216
  • Turkey (Türkiye)+90
  • Turkmenistan+993
  • Turks and Caicos Islands+1649
  • Tuvalu+688
  • U.S. Virgin Islands+1340
  • Uganda+256
  • Ukraine (Україна)+380
  • United Arab Emirates (‫الإمارات العربية المتحدة‬‎)+971
  • United Kingdom+44
  • United States+1
  • Uruguay+598
  • Uzbekistan (Oʻzbekiston)+998
  • Vanuatu+678
  • Vatican City (Città del Vaticano)+39
  • Venezuela+58
  • Vietnam (Việt Nam)+84
  • Wallis and Futuna+681
  • Western Sahara (‫الصحراء الغربية‬‎)+212
  • Yemen (‫اليمن‬‎)+967
  • Zambia+260
  • Zimbabwe+263
  • Åland Islands+358
Thanks! We'll be in touch in the next 12 hours
Oops! Something went wrong while submitting the form.

Setting up Mutual TLS Authentication and Authorization on Amazon MSK

Raihan Choudhury

Messaging System

Overview

We will cover how to set up mutual TLS authentication and authorization on Amazon MSK.

Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data. You can enable client authentication with TLS for connections and client authorization from your applications to your Amazon MSK brokers and ZooKeeper nodes. 

Prerequisites

  • Terraform: For creating a private CA and MSK Cluster
  • AWS CLI: For creating TLS certificates (the user must have access to create a private CA, issue certificates, and create MSK cluster)

Setup TLS authentication and authorization

To use client authentication with TLS on MSK, you need to create the following resources:

  • AWS Private CA
  • MSK cluster with TLS encryption enabled
  • Client certificates

Create AWS Private CA

AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see Creating and Managing a AWS Private CA. In this setup, we will use Terraform to create a private CA.

Steps to create Private CA

  1. Run below Terraform code to create the Private CA.

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
resource "aws_acmpca_certificate_authority" "root_ca" {
certificate_authority_configuration {
key_algorithm = "RSA_4096"
signing_algorithm = "SHA512WITHRSA"
subject {
#Update the attributes as per your need
common_name = "exp-msk-ca"
country = "US"
locality = "Seattle"
organization = "Example Corp"
organizational_unit = "Sales"
state = "WA"
}
}
type = "ROOT"
}
view raw .tf hosted with ❤ by GitHub

  1. Once the private CA is created, install the certificate from the AWS console.

Steps to install the certificate.

  • If you are not already on the CA's details page, open the AWS Private CA console at https://console.aws.amazon.com/acm-pca/home. On the private certificate authorities page, choose a root CA that you have created with the certificate status as Pending or Active.
  • Choose Actions, and installthe  CA certificate to open the Install root CA certificate page.
  • Under Specify the root CA certificate parameters, specify the following certificate parameters:
  • Validity — Specifies the expiration date and time for the CA certificate. The AWS Private CA default validity period for a root CA certificate is ten years.
  • Signature algorithm — Specifies the signing algorithm to use when the root CA issues new certificates. Available options vary according to the AWS Region where you are creating the CA. For more information, see Compatible signing algorithms, Supported cryptographic algorithms, and SigningAlgorithm in CertificateAuthorityConfiguration.
  • SHA256 RSA
  • Review your settings to make sure they’re correct, then choose Confirm and install.        
  • The details page for the CA displays the status of the installation (success or failure) at the top. If the installation was successful, the newly completed root CA displays a status of Active in the General pane.

Create an MSK cluster that supports TLS client authentication.

Note: We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

Run the below Terraform code to create MSK cluster

Note: Update attributes as per the requirement and configurations.

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
module "kafka" {
source = "cloudposse/msk-apache-kafka-cluster/aws"
# Cloud Posse recommends pinning every module to a specific version
version = "2.3.0"
name = "test-msk-cluster" #Change MSK cluster name as per your need
vpc_id = "<VPC_ID>"
subnet_ids = ["SUBNET1a","SUNBNET2b"] # Minimum 2 subnets required.
kafka_version = "3.4.0" #recommended version by AWS as of 19 Sep 2022
broker_per_zone = 1 #Number of broker per availability zone
broker_instance_type = "kafka.t3.small" #MSK instance types
broker_volume_size = 10 #Broker disk size
certificate_authority_arns = ["<CA_ARN>"] #arn of the CA that you have created in the earlier step
client_tls_auth_enabled = true
encryption_in_cluster = true
client_broker = "TLS" # Enables TLS encryption
enhanced_monitoring = "PER_TOPIC_PER_BROKER"
cloudwatch_logs_enabled = false # Enable if you need cloudwatch logs
jmx_exporter_enabled = false # Enable if you need jmx metrics
node_exporter_enabled = false # Enable if you need node metrics
associated_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
allowed_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
create_security_group = false
}
#-----------------------End--------------------#
resource "aws_security_group" "kafka_sg" {
name = "test-msk-cluster-sg" #Change the name as per your need
description = "Security Group for kafka cluster"
vpc_id = "<VPC_ID>"
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 2181
to_port = 2181
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 9094
to_port = 9094
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
# Enable if you need to add tags to MSK cluster
#tags = var.tags
# Enable if you need cloudwatch logs
# depends_on = [
# aws_cloudwatch_log_group.cw_log_group
#]
}
# Required for cloudwatch logs
# resource "aws_cloudwatch_log_group" "cw_log_group" {
# name = "blog-msk-cluster"
# #tags = var.tags
# }
output "bootstrap_url" {
value = module.kafka.bootstrap_brokers_tls
description = "Comma separated list of one or more DNS names (or IP addresses) and TLS port pairs for access to the Kafka cluster using TLS"
}
view raw .tf hosted with ❤ by GitHub

It will take 15-20 minutes to create the MSK cluster. 

Note: Since the bootstrap URL will be used to communicate with the MSK cluster using the Kafka CLI or SDKs, save it from the Terraform output.

Create TLS certificates using previously created AWS Private CA

We will create two certificates, one is for admin access, and the other one is for client access. For creating certificates, a common name (CN) is required. The CN is used as a principal while granting permissions through kafka ACLs

Create admin TLS certificate

Steps to create TLS certificate

  1. Generate CSR and key.

openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj '/CN=admin'
view raw .sh hosted with ❤ by GitHub

  1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).

certArn=$(aws acm-pca issue-certificate --region <region> --certificate-authority-arn "<CA_ARN>"
--csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
view raw .sh hosted with ❤ by GitHub

  1. Get the certificate ARN issued in the previous step.

aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/\t/\n/g' >
cert.pem
view raw .sh hosted with ❤ by GitHub

  1. Export the certificate in pkcs12 format.

openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out admin.p12
view raw .sh hosted with ❤ by GitHub

Create client TLS certificate

  1. Generate CSR and key

openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj
'/CN=client'
view raw .sh hosted with ❤ by GitHub

  1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).

certArn=$(aws acm-pca issue-certificate --region <region>
--certificate-authority-arn "<CA_ARN>" --csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
view raw .sh hosted with ❤ by GitHub

  1. Get certificate ARN issue in the previous step.

aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/\t/\n/g' >
cert.pem
view raw .sh hosted with ❤ by GitHub

  1. Export the certificate in pkcs12 format.

openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out client.p12
view raw .sh hosted with ❤ by GitHub

Setup a client machine to interact with the MSK cluster

  1. Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See Step 3: Create a client machine for an example of how to create such a client machine.
  2. Copy previously created certificates admin.p12 and client.p12 into the client machine.
  3. Install java8+ on the client machine.
  4. Download Kafka binaries and extract

https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz

  1. Create admin and client configuration files for authentication  and authorization.

cat <<EOF> admin.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./admin.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
cat <<EOF> client.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./client.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
view raw .sh hosted with ❤ by GitHub

Test Authentication and Authorization using ACLs

Create Admin ACLs for granting admin access to clusters, topics, and groups

By default, the MSK cluster will allow everyone if no ACL is found. Here the Admin ACL will be the first ACL. The Admin user ("User:CN=admin") will leverage on Admin ACL to grant permissions to Client User("User:CN=client").

ACL for managing cluster operations (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--cluster \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

ACL for managing topics permissions (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--topic "*" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

ACL for managing group permissions (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--group "*" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Create a topic.

./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--create --topic test-topic --command-config admin.properties
view raw .sh hosted with ❤ by GitHub

List topic and check the topic is created.

./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--list --command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Grant write permission to the topic so that client (producer) can publish messages to the topic (Use admin user for granting access to client).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=client" \
--operation Write \
--topic "test-topic" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Publish messages to the topic using client user.

for x in {1..10}; do echo "message $x"; done |
./kafka_2.13-3.5.0/bin/kafka-console-producer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --producer.config kafka-admin-client.properties --topic
test-topic --producer-property enable.idempotence=false
view raw .sh hosted with ❤ by GitHub

Consume messages.

Note: If you try to consume messages from the topic using a consumer group, you will get a group authorization error since the client user is not authorized to access groups.

./kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --topic test-topic --max-messages 2 --consumer-property
enable.auto.commit=false --consumer-property group.id=consumer-test
--from-beginning --consumer.config client.properties
view raw .sh hosted with ❤ by GitHub

Grant group permission to the client user.

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=client" \
--operation Read \
--resource-pattern-type prefixed \
--group 'consumer-' \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

After providing group access, the client user should be able to consume messages from the topic using a consumer group.

This way, you can manage client access to the topics and groups.

Additional Commands 

List ACL.

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--bootstrap-server "<BOOTSTRAP_URL>" \
--list \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Delete ACL (Create and delete ACL commands are same except –aad/–remove argument).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--remove \
--allow-principal "User:CN=admin" \
--operation All \
--cluster \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Conclusion

AWS MSK eases the effort of managing independently hosted Kafka clusters. Users can scale Kafka brokers and storage as necessary. MSK supports TLS encryption and allows users to create TLS connections from the application to Amazon MSK brokers and ZooKeeper nodes with the help of the AWS Private CA, which enables users to create certificates for authentication.

Get the latest engineering blogs delivered straight to your inbox.
No spam. Only expert insights.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings

Setting up Mutual TLS Authentication and Authorization on Amazon MSK

Overview

We will cover how to set up mutual TLS authentication and authorization on Amazon MSK.

Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data. You can enable client authentication with TLS for connections and client authorization from your applications to your Amazon MSK brokers and ZooKeeper nodes. 

Prerequisites

  • Terraform: For creating a private CA and MSK Cluster
  • AWS CLI: For creating TLS certificates (the user must have access to create a private CA, issue certificates, and create MSK cluster)

Setup TLS authentication and authorization

To use client authentication with TLS on MSK, you need to create the following resources:

  • AWS Private CA
  • MSK cluster with TLS encryption enabled
  • Client certificates

Create AWS Private CA

AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see Creating and Managing a AWS Private CA. In this setup, we will use Terraform to create a private CA.

Steps to create Private CA

  1. Run below Terraform code to create the Private CA.

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
resource "aws_acmpca_certificate_authority" "root_ca" {
certificate_authority_configuration {
key_algorithm = "RSA_4096"
signing_algorithm = "SHA512WITHRSA"
subject {
#Update the attributes as per your need
common_name = "exp-msk-ca"
country = "US"
locality = "Seattle"
organization = "Example Corp"
organizational_unit = "Sales"
state = "WA"
}
}
type = "ROOT"
}
view raw .tf hosted with ❤ by GitHub

  1. Once the private CA is created, install the certificate from the AWS console.

Steps to install the certificate.

  • If you are not already on the CA's details page, open the AWS Private CA console at https://console.aws.amazon.com/acm-pca/home. On the private certificate authorities page, choose a root CA that you have created with the certificate status as Pending or Active.
  • Choose Actions, and installthe  CA certificate to open the Install root CA certificate page.
  • Under Specify the root CA certificate parameters, specify the following certificate parameters:
  • Validity — Specifies the expiration date and time for the CA certificate. The AWS Private CA default validity period for a root CA certificate is ten years.
  • Signature algorithm — Specifies the signing algorithm to use when the root CA issues new certificates. Available options vary according to the AWS Region where you are creating the CA. For more information, see Compatible signing algorithms, Supported cryptographic algorithms, and SigningAlgorithm in CertificateAuthorityConfiguration.
  • SHA256 RSA
  • Review your settings to make sure they’re correct, then choose Confirm and install.        
  • The details page for the CA displays the status of the installation (success or failure) at the top. If the installation was successful, the newly completed root CA displays a status of Active in the General pane.

Create an MSK cluster that supports TLS client authentication.

Note: We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

Run the below Terraform code to create MSK cluster

Note: Update attributes as per the requirement and configurations.

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}
module "kafka" {
source = "cloudposse/msk-apache-kafka-cluster/aws"
# Cloud Posse recommends pinning every module to a specific version
version = "2.3.0"
name = "test-msk-cluster" #Change MSK cluster name as per your need
vpc_id = "<VPC_ID>"
subnet_ids = ["SUBNET1a","SUNBNET2b"] # Minimum 2 subnets required.
kafka_version = "3.4.0" #recommended version by AWS as of 19 Sep 2022
broker_per_zone = 1 #Number of broker per availability zone
broker_instance_type = "kafka.t3.small" #MSK instance types
broker_volume_size = 10 #Broker disk size
certificate_authority_arns = ["<CA_ARN>"] #arn of the CA that you have created in the earlier step
client_tls_auth_enabled = true
encryption_in_cluster = true
client_broker = "TLS" # Enables TLS encryption
enhanced_monitoring = "PER_TOPIC_PER_BROKER"
cloudwatch_logs_enabled = false # Enable if you need cloudwatch logs
jmx_exporter_enabled = false # Enable if you need jmx metrics
node_exporter_enabled = false # Enable if you need node metrics
associated_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
allowed_security_group_ids = ["${aws_security_group.kafka_sg.id}"]
create_security_group = false
}
#-----------------------End--------------------#
resource "aws_security_group" "kafka_sg" {
name = "test-msk-cluster-sg" #Change the name as per your need
description = "Security Group for kafka cluster"
vpc_id = "<VPC_ID>"
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 2181
to_port = 2181
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
ingress {
from_port = 9094
to_port = 9094
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
# Enable if you need to add tags to MSK cluster
#tags = var.tags
# Enable if you need cloudwatch logs
# depends_on = [
# aws_cloudwatch_log_group.cw_log_group
#]
}
# Required for cloudwatch logs
# resource "aws_cloudwatch_log_group" "cw_log_group" {
# name = "blog-msk-cluster"
# #tags = var.tags
# }
output "bootstrap_url" {
value = module.kafka.bootstrap_brokers_tls
description = "Comma separated list of one or more DNS names (or IP addresses) and TLS port pairs for access to the Kafka cluster using TLS"
}
view raw .tf hosted with ❤ by GitHub

It will take 15-20 minutes to create the MSK cluster. 

Note: Since the bootstrap URL will be used to communicate with the MSK cluster using the Kafka CLI or SDKs, save it from the Terraform output.

Create TLS certificates using previously created AWS Private CA

We will create two certificates, one is for admin access, and the other one is for client access. For creating certificates, a common name (CN) is required. The CN is used as a principal while granting permissions through kafka ACLs

Create admin TLS certificate

Steps to create TLS certificate

  1. Generate CSR and key.

openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj '/CN=admin'
view raw .sh hosted with ❤ by GitHub

  1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).

certArn=$(aws acm-pca issue-certificate --region <region> --certificate-authority-arn "<CA_ARN>"
--csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
view raw .sh hosted with ❤ by GitHub

  1. Get the certificate ARN issued in the previous step.

aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/\t/\n/g' >
cert.pem
view raw .sh hosted with ❤ by GitHub

  1. Export the certificate in pkcs12 format.

openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out admin.p12
view raw .sh hosted with ❤ by GitHub

Create client TLS certificate

  1. Generate CSR and key

openssl req -newkey rsa:2048 -keyout key.pem -out cert.csr -batch -nodes -subj
'/CN=client'
view raw .sh hosted with ❤ by GitHub

  1. Issue certificate using previously created private CA (replace <CA_ARN> with the ARN of the AWS Private CA that you created).

certArn=$(aws acm-pca issue-certificate --region <region>
--certificate-authority-arn "<CA_ARN>" --csr fileb://cert.csr
--signing-algorithm 'SHA256WITHRSA' --validity Value=180,Type='DAYS' --query
'CertificateArn' --output text)
view raw .sh hosted with ❤ by GitHub

  1. Get certificate ARN issue in the previous step.

aws acm-pca get-certificate --region <region> --certificate-authority-arn
"<CA_ARN>" --certificate-arn "${certArn}" --output text | sed 's/\t/\n/g' >
cert.pem
view raw .sh hosted with ❤ by GitHub

  1. Export the certificate in pkcs12 format.

openssl pkcs12 -export -in cert.pem -inkey key.pem -name ssl-configurator
-password pass: -out client.p12
view raw .sh hosted with ❤ by GitHub

Setup a client machine to interact with the MSK cluster

  1. Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See Step 3: Create a client machine for an example of how to create such a client machine.
  2. Copy previously created certificates admin.p12 and client.p12 into the client machine.
  3. Install java8+ on the client machine.
  4. Download Kafka binaries and extract

https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz

  1. Create admin and client configuration files for authentication  and authorization.

cat <<EOF> admin.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./admin.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
cat <<EOF> client.propertie
bootstrap.servers="<BOOTSTRAP_URL>"
security.protocol=SSL
ssl.keystore.location=./client.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=
EOF
view raw .sh hosted with ❤ by GitHub

Test Authentication and Authorization using ACLs

Create Admin ACLs for granting admin access to clusters, topics, and groups

By default, the MSK cluster will allow everyone if no ACL is found. Here the Admin ACL will be the first ACL. The Admin user ("User:CN=admin") will leverage on Admin ACL to grant permissions to Client User("User:CN=client").

ACL for managing cluster operations (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--cluster \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

ACL for managing topics permissions (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--topic "*" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

ACL for managing group permissions (Admin ACL).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=admin" \
--operation All \
--group "*" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Create a topic.

./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--create --topic test-topic --command-config admin.properties
view raw .sh hosted with ❤ by GitHub

List topic and check the topic is created.

./kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server "<BOOTSTRAP_URL>"
--list --command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Grant write permission to the topic so that client (producer) can publish messages to the topic (Use admin user for granting access to client).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=client" \
--operation Write \
--topic "test-topic" \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Publish messages to the topic using client user.

for x in {1..10}; do echo "message $x"; done |
./kafka_2.13-3.5.0/bin/kafka-console-producer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --producer.config kafka-admin-client.properties --topic
test-topic --producer-property enable.idempotence=false
view raw .sh hosted with ❤ by GitHub

Consume messages.

Note: If you try to consume messages from the topic using a consumer group, you will get a group authorization error since the client user is not authorized to access groups.

./kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --bootstrap-server
"<BOOTSTRAP_URL>" --topic test-topic --max-messages 2 --consumer-property
enable.auto.commit=false --consumer-property group.id=consumer-test
--from-beginning --consumer.config client.properties
view raw .sh hosted with ❤ by GitHub

Grant group permission to the client user.

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--add \
--allow-principal "User:CN=client" \
--operation Read \
--resource-pattern-type prefixed \
--group 'consumer-' \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

After providing group access, the client user should be able to consume messages from the topic using a consumer group.

This way, you can manage client access to the topics and groups.

Additional Commands 

List ACL.

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--bootstrap-server "<BOOTSTRAP_URL>" \
--list \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Delete ACL (Create and delete ACL commands are same except –aad/–remove argument).

./kafka_2.13-3.5.0/bin/kafka-acls.sh \
--remove \
--allow-principal "User:CN=admin" \
--operation All \
--cluster \
--bootstrap-server "<BOOTSTRAP_URL>" \
--command-config admin.properties
view raw .sh hosted with ❤ by GitHub

Conclusion

AWS MSK eases the effort of managing independently hosted Kafka clusters. Users can scale Kafka brokers and storage as necessary. MSK supports TLS encryption and allows users to create TLS connections from the application to Amazon MSK brokers and ZooKeeper nodes with the help of the AWS Private CA, which enables users to create certificates for authentication.