• United States+1
  • United Kingdom+44
  • Afghanistan (‫افغانستان‬‎)+93
  • Albania (Shqipëri)+355
  • Algeria (‫الجزائر‬‎)+213
  • American Samoa+1684
  • Andorra+376
  • Angola+244
  • Anguilla+1264
  • Antigua and Barbuda+1268
  • Argentina+54
  • Armenia (Հայաստան)+374
  • Aruba+297
  • Australia+61
  • Austria (Österreich)+43
  • Azerbaijan (Azərbaycan)+994
  • Bahamas+1242
  • Bahrain (‫البحرين‬‎)+973
  • Bangladesh (বাংলাদেশ)+880
  • Barbados+1246
  • Belarus (Беларусь)+375
  • Belgium (België)+32
  • Belize+501
  • Benin (Bénin)+229
  • Bermuda+1441
  • Bhutan (འབྲུག)+975
  • Bolivia+591
  • Bosnia and Herzegovina (Босна и Херцеговина)+387
  • Botswana+267
  • Brazil (Brasil)+55
  • British Indian Ocean Territory+246
  • British Virgin Islands+1284
  • Brunei+673
  • Bulgaria (България)+359
  • Burkina Faso+226
  • Burundi (Uburundi)+257
  • Cambodia (កម្ពុជា)+855
  • Cameroon (Cameroun)+237
  • Canada+1
  • Cape Verde (Kabu Verdi)+238
  • Caribbean Netherlands+599
  • Cayman Islands+1345
  • Central African Republic (République centrafricaine)+236
  • Chad (Tchad)+235
  • Chile+56
  • China (中国)+86
  • Christmas Island+61
  • Cocos (Keeling) Islands+61
  • Colombia+57
  • Comoros (‫جزر القمر‬‎)+269
  • Congo (DRC) (Jamhuri ya Kidemokrasia ya Kongo)+243
  • Congo (Republic) (Congo-Brazzaville)+242
  • Cook Islands+682
  • Costa Rica+506
  • Côte d’Ivoire+225
  • Croatia (Hrvatska)+385
  • Cuba+53
  • Curaçao+599
  • Cyprus (Κύπρος)+357
  • Czech Republic (Česká republika)+420
  • Denmark (Danmark)+45
  • Djibouti+253
  • Dominica+1767
  • Dominican Republic (República Dominicana)+1
  • Ecuador+593
  • Egypt (‫مصر‬‎)+20
  • El Salvador+503
  • Equatorial Guinea (Guinea Ecuatorial)+240
  • Eritrea+291
  • Estonia (Eesti)+372
  • Ethiopia+251
  • Falkland Islands (Islas Malvinas)+500
  • Faroe Islands (Føroyar)+298
  • Fiji+679
  • Finland (Suomi)+358
  • France+33
  • French Guiana (Guyane française)+594
  • French Polynesia (Polynésie française)+689
  • Gabon+241
  • Gambia+220
  • Georgia (საქართველო)+995
  • Germany (Deutschland)+49
  • Ghana (Gaana)+233
  • Gibraltar+350
  • Greece (Ελλάδα)+30
  • Greenland (Kalaallit Nunaat)+299
  • Grenada+1473
  • Guadeloupe+590
  • Guam+1671
  • Guatemala+502
  • Guernsey+44
  • Guinea (Guinée)+224
  • Guinea-Bissau (Guiné Bissau)+245
  • Guyana+592
  • Haiti+509
  • Honduras+504
  • Hong Kong (香港)+852
  • Hungary (Magyarország)+36
  • Iceland (Ísland)+354
  • India (भारत)+91
  • Indonesia+62
  • Iran (‫ایران‬‎)+98
  • Iraq (‫العراق‬‎)+964
  • Ireland+353
  • Isle of Man+44
  • Israel (‫ישראל‬‎)+972
  • Italy (Italia)+39
  • Jamaica+1876
  • Japan (日本)+81
  • Jersey+44
  • Jordan (‫الأردن‬‎)+962
  • Kazakhstan (Казахстан)+7
  • Kenya+254
  • Kiribati+686
  • Kosovo+383
  • Kuwait (‫الكويت‬‎)+965
  • Kyrgyzstan (Кыргызстан)+996
  • Laos (ລາວ)+856
  • Latvia (Latvija)+371
  • Lebanon (‫لبنان‬‎)+961
  • Lesotho+266
  • Liberia+231
  • Libya (‫ليبيا‬‎)+218
  • Liechtenstein+423
  • Lithuania (Lietuva)+370
  • Luxembourg+352
  • Macau (澳門)+853
  • Macedonia (FYROM) (Македонија)+389
  • Madagascar (Madagasikara)+261
  • Malawi+265
  • Malaysia+60
  • Maldives+960
  • Mali+223
  • Malta+356
  • Marshall Islands+692
  • Martinique+596
  • Mauritania (‫موريتانيا‬‎)+222
  • Mauritius (Moris)+230
  • Mayotte+262
  • Mexico (México)+52
  • Micronesia+691
  • Moldova (Republica Moldova)+373
  • Monaco+377
  • Mongolia (Монгол)+976
  • Montenegro (Crna Gora)+382
  • Montserrat+1664
  • Morocco (‫المغرب‬‎)+212
  • Mozambique (Moçambique)+258
  • Myanmar (Burma) (မြန်မာ)+95
  • Namibia (Namibië)+264
  • Nauru+674
  • Nepal (नेपाल)+977
  • Netherlands (Nederland)+31
  • New Caledonia (Nouvelle-Calédonie)+687
  • New Zealand+64
  • Nicaragua+505
  • Niger (Nijar)+227
  • Nigeria+234
  • Niue+683
  • Norfolk Island+672
  • North Korea (조선 민주주의 인민 공화국)+850
  • Northern Mariana Islands+1670
  • Norway (Norge)+47
  • Oman (‫عُمان‬‎)+968
  • Pakistan (‫پاکستان‬‎)+92
  • Palau+680
  • Palestine (‫فلسطين‬‎)+970
  • Panama (Panamá)+507
  • Papua New Guinea+675
  • Paraguay+595
  • Peru (Perú)+51
  • Philippines+63
  • Poland (Polska)+48
  • Portugal+351
  • Puerto Rico+1
  • Qatar (‫قطر‬‎)+974
  • Réunion (La Réunion)+262
  • Romania (România)+40
  • Russia (Россия)+7
  • Rwanda+250
  • Saint Barthélemy (Saint-Barthélemy)+590
  • Saint Helena+290
  • Saint Kitts and Nevis+1869
  • Saint Lucia+1758
  • Saint Martin (Saint-Martin (partie française))+590
  • Saint Pierre and Miquelon (Saint-Pierre-et-Miquelon)+508
  • Saint Vincent and the Grenadines+1784
  • Samoa+685
  • San Marino+378
  • São Tomé and Príncipe (São Tomé e Príncipe)+239
  • Saudi Arabia (‫المملكة العربية السعودية‬‎)+966
  • Senegal (Sénégal)+221
  • Serbia (Србија)+381
  • Seychelles+248
  • Sierra Leone+232
  • Singapore+65
  • Sint Maarten+1721
  • Slovakia (Slovensko)+421
  • Slovenia (Slovenija)+386
  • Solomon Islands+677
  • Somalia (Soomaaliya)+252
  • South Africa+27
  • South Korea (대한민국)+82
  • South Sudan (‫جنوب السودان‬‎)+211
  • Spain (España)+34
  • Sri Lanka (ශ්‍රී ලංකාව)+94
  • Sudan (‫السودان‬‎)+249
  • Suriname+597
  • Svalbard and Jan Mayen+47
  • Swaziland+268
  • Sweden (Sverige)+46
  • Switzerland (Schweiz)+41
  • Syria (‫سوريا‬‎)+963
  • Taiwan (台灣)+886
  • Tajikistan+992
  • Tanzania+255
  • Thailand (ไทย)+66
  • Timor-Leste+670
  • Togo+228
  • Tokelau+690
  • Tonga+676
  • Trinidad and Tobago+1868
  • Tunisia (‫تونس‬‎)+216
  • Turkey (Türkiye)+90
  • Turkmenistan+993
  • Turks and Caicos Islands+1649
  • Tuvalu+688
  • U.S. Virgin Islands+1340
  • Uganda+256
  • Ukraine (Україна)+380
  • United Arab Emirates (‫الإمارات العربية المتحدة‬‎)+971
  • United Kingdom+44
  • United States+1
  • Uruguay+598
  • Uzbekistan (Oʻzbekiston)+998
  • Vanuatu+678
  • Vatican City (Città del Vaticano)+39
  • Venezuela+58
  • Vietnam (Việt Nam)+84
  • Wallis and Futuna+681
  • Western Sahara (‫الصحراء الغربية‬‎)+212
  • Yemen (‫اليمن‬‎)+967
  • Zambia+260
  • Zimbabwe+263
  • Åland Islands+358
Thanks! We'll be in touch in the next 12 hours
Oops! Something went wrong while submitting the form.

Building a WebSocket Service with AWS Lambda & DynamoDB

Akash Rajput

Full-stack Development

WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

Prerequisites

A basic understanding of real-time web applications will help with this implementation. Throughout this article, we will be using Serverless Framework for developing and deploying the WebSocket service. Also, Node.js is used to write the business logic. 

Behind the scenes, Serverless uses Cloudformation to create various required resources, like API Gateway APIs, AWS Lambda functions, IAM roles and policies, etc.

Why Serverless?

Serverless Framework abstracts the complex syntax needed for creating the Cloudformation stacks and helps us focus on the business logic of the services. Along with that, there are a variety of plugins available that help developing serverless applications easier.

Why DynamoDB?

We need persistent storage for WebSocket connection data, along with AWS Lambda. DynamoDB, a serverless key-value database from AWS, offers low latency, making it a great fit for storing and retrieving WebSocket connection details.

Overview

In this application, we’ll be creating an AWS Lambda service that accepts the WebSocket connections coming via API Gateway. The connections and subscriptions to topics are persisted using DynamoDB. We will be using ws for implementing basic WebSocket clients for the demonstration. The implementation has a Lambda consuming WebSocket that receives the connections and handles the communication. 

Base Setup

We will be using the default Node.js boilerplate offered by Serverless as a starting point.

serverless create --template aws-nodejs
view raw base_setup hosted with ❤ by GitHub

A few of the Serverless plugins are installed and used to speed up the development and deployment of the Serverless stack. We also add the webpack config given here to support the latest JS syntax.

Adding Lambda role and policies:

The lambda function requires a role attached to it that has enough permissions to access DynamoDB and Execute API. These are the links for the configuration files:

Link to dynamoDB.yaml

Link to lambdaRole.yaml

Adding custom config for plugins:

The plugins used for local development must have the custom config added in the yaml file.

This is how our serverless.yaml file should look like after the base serverless configuration:

service: websocket-app
frameworkVersion: '2'
custom:
dynamodb:
stages:
- dev
start:
port: 8000
inMemory: true
heapInitial: 200m
heapMax: 1g
migrate: true
convertEmptyValues: true
webpack:
keepOutputDirectory: true
packager: 'npm'
includeModules:
forceExclude:
- aws-sdk
provider:
name: aws
runtime: nodejs12.x
lambdaHashingVersion: 20201221
plugins:
- serverless-dynamodb-local
- serverless-plugin-existing-s3
- serverless-dotenv-plugin
- serverless-webpack
- serverless-offline
resources:
- Resources: ${file(./config/dynamoDB.yaml)}
- Resources: ${file(./config/lambdaRoles.yaml)}
functions:
hello:
handler: handler.hello

Add WebSocket Lambda:

We need to create a lambda function that accepts WebSocket events from API Gateway. As you can see, we’ve defined 3 WebSocket events for the lambda function.

  • $connect
  • $disconnect
  • $default

These 3 events stand for the default routes that come with WebSocket API Gateway offering. $connect and $disconnect are used for initialization and termination of the socket connection, where $default route is for data transfer.

functions:
websocket:
handler: lambda/websocket.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default

We can go ahead and update how data is sent and add custom WebSocket routes to the application.

The lambda needs to establish a connection with the client and handle the subscriptions. The logic for updating the DynamoDB is written in a utility class client. Whenever a connection is received, we create a record in the topics table.

console.log(`Received socket connectionId: ${event.requestContext && event.requestContext.connectionId}`);
if (!(event.requestContext && event.requestContext.connectionId)) {
throw new Error('Invalid event. Missing `connectionId` parameter.');
}
const connectionId = event.requestContext.connectionId;
const route = event.requestContext.routeKey;
console.log(`data from ${connectionId} ${event.body}`);
const connection = new Client(connectionId);
const response = { statusCode: 200, body: '' };
if (route === '$connect') {
console.log(`Route ${route} - Socket connectionId connectedconected: ${event.requestContext && event.requestContext.connectionId}`);
await new Client(connectionId).connect();
return response;
}

The Client utility class internally creates a record for the given connectionId in the DynamoDB topics table.

async subscribe({ topic, ttl }) {
return dynamoDBClient
.put({
Item: {
topic,
connectionId: this.connectionId,
ttl: typeof ttl === 'number' ? ttl : Math.floor(Date.now() / 1000) + 60 * 60 * 2,
},
TableName: process.env.TOPICS_TABLE,
}).promise();
}

Similarly, for the $disconnect route, we remove the INITIAL_CONNECTION topic record when a client disconnects.

else if (route === '$disconnect') {
console.log(`Route ${route} - Socket disconnected: ${ event.requestContext.connectionId}`);
await new Client(connectionId).unsubscribe();
return response;
}

The client.unsubscribe method internally removes the connection entry from the DynamoDB table. Here, the getTopics method fetches all the topics the particular client has subscribed to.

async unsubscribe() {
const topics = await this.getTopics();
if (!topics) {
throw Error(`Topics got undefined`);
}
return this.removeTopics({
[process.env.TOPICS_TABLE]: topics.map(({ topic, connectionId }) => ({
DeleteRequest: { Key: { topic, connectionId } },
})),
});
}

Now comes the default route part of the lambda where we customize message handling. In this implementation, we’re relaying our message handling based on the event.body.type, which indicates what kind of message is received from the client to server. The subscribe type here is used to subscribe to new topics. Similarly, the message type is used to receive the message from one client and then publish it to other clients who have subscribed to the same topic as the sender.

console.log(`Route ${route} - data from ${connectionId}`);
if (!event.body) {
return response;
}
let body = JSON.parse(event.body);
const topic = body.topic;
if (body.type === 'subscribe') {
connection.subscribe({ topic });
console.log(`Client subscribing for topic: ${topic}`);
}
if (body.type === 'message') {
await new Topic(topic).publishMessage({ data: body.message });
console.error(`Published messages to subscribers`);
return response;
}
return response;

Similar to $connect, the subscribe type of payload, when received, creates a new subscription for the mentioned topic.

Publishing the messages

Here is the interesting part of this lambda. When a client sends a payload with type message, the lambda calls the publishMessage method with the data received. The method gets the subscribers active for the topic and publishes messages using another utility TopicSubscriber.sendMessage

async publishMessage(data) {
const subscribers = await this.getSubscribers();
const promises = subscribers.map(async ({ connectionId, subscriptionId }) => {
const TopicSubscriber = new Client(connectionId);
const res = await TopicSubscriber.sendMessage({
id: subscriptionId,
payload: { data },
type: 'data',
});
return res;
});
return Promise.all(promises);
}

The sendMessage executes the API endpoint, which is the API Gateway URL after deployment. As we’re using serverless-offline for the local development, the IS_OFFLINE env variable is automatically set.

const endpoint = process.env.IS_OFFLINE ? 'http://localhost:3001' : process.env.PUBLISH_ENDPOINT;
console.log('publish endpoint', endpoint);
const gatewayClient = new ApiGatewayManagementApi({
apiVersion: '2018-11-29',
credentials: config,
endpoint,
});
return gatewayClient
.postToConnection({
ConnectionId: this.connectionId,
Data: JSON.stringify(message),
})
.promise();

Instead of manually invoking the API endpoint, we can also use DynamoDB streams to trigger a lambda asynchronously and publish messages to topics.

Implementing the client

For testing the socket implementation, we will be using a node.js script ws-client.js. This creates two nodejs ws clients: one that sends the data and another that receives it.

const WebSocket = require('ws');
const sockedEndpoint = 'http://0.0.0.0:3001';
const ws1 = new WebSocket(sockedEndpoint, {
perMessageDeflate: false
});
const ws2 = new WebSocket(sockedEndpoint, {
perMessageDeflate: false
});

The first client on connect sends the data at an interval of one second to a topic named general. The count increments each send.

ws1.on('open', () => {
console.log('WS1 connected');
let count = 0;
setInterval(() => {
const data = {
type: 'message',
message: `count is ${count}`,
topic: 'general'
}
const message = JSON.stringify(data);
ws1.send(message, (err) => {
if(err) {
console.log(`Error occurred while send data ${err.message}`)
}
console.log(`WS1 OUT ${message}`);
})
count++;
}, 15000)
})

The second client on connect will first subscribe to the general topic and then attach a handler for receiving data.

ws2.on('open', () => {
console.log('WS2 connected');
const data = {
type: 'subscribe',
topic: 'general'
}
ws2.send(JSON.stringify(data), (err) => {
if(err) {
console.log(`Error occurred while send data ${err.message}`)
}
})
});
ws2.on('message', ( message) => {
console.log(`ws2 IN ${message}`);
});

Once the service is running, we should be able to see the following output, where the two clients successfully sharing and receiving the messages with our socket server.

Conclusion

With API Gateway WebSocket support and DynamoDB, we’re able to implement persistent socket connections using serverless functions. The implementation can be improved and can be as complex as needed.

WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

Get the latest engineering blogs delivered straight to your inbox.
No spam. Only expert insights.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings

Building a WebSocket Service with AWS Lambda & DynamoDB

WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

Prerequisites

A basic understanding of real-time web applications will help with this implementation. Throughout this article, we will be using Serverless Framework for developing and deploying the WebSocket service. Also, Node.js is used to write the business logic. 

Behind the scenes, Serverless uses Cloudformation to create various required resources, like API Gateway APIs, AWS Lambda functions, IAM roles and policies, etc.

Why Serverless?

Serverless Framework abstracts the complex syntax needed for creating the Cloudformation stacks and helps us focus on the business logic of the services. Along with that, there are a variety of plugins available that help developing serverless applications easier.

Why DynamoDB?

We need persistent storage for WebSocket connection data, along with AWS Lambda. DynamoDB, a serverless key-value database from AWS, offers low latency, making it a great fit for storing and retrieving WebSocket connection details.

Overview

In this application, we’ll be creating an AWS Lambda service that accepts the WebSocket connections coming via API Gateway. The connections and subscriptions to topics are persisted using DynamoDB. We will be using ws for implementing basic WebSocket clients for the demonstration. The implementation has a Lambda consuming WebSocket that receives the connections and handles the communication. 

Base Setup

We will be using the default Node.js boilerplate offered by Serverless as a starting point.

serverless create --template aws-nodejs
view raw base_setup hosted with ❤ by GitHub

A few of the Serverless plugins are installed and used to speed up the development and deployment of the Serverless stack. We also add the webpack config given here to support the latest JS syntax.

Adding Lambda role and policies:

The lambda function requires a role attached to it that has enough permissions to access DynamoDB and Execute API. These are the links for the configuration files:

Link to dynamoDB.yaml

Link to lambdaRole.yaml

Adding custom config for plugins:

The plugins used for local development must have the custom config added in the yaml file.

This is how our serverless.yaml file should look like after the base serverless configuration:

service: websocket-app
frameworkVersion: '2'
custom:
dynamodb:
stages:
- dev
start:
port: 8000
inMemory: true
heapInitial: 200m
heapMax: 1g
migrate: true
convertEmptyValues: true
webpack:
keepOutputDirectory: true
packager: 'npm'
includeModules:
forceExclude:
- aws-sdk
provider:
name: aws
runtime: nodejs12.x
lambdaHashingVersion: 20201221
plugins:
- serverless-dynamodb-local
- serverless-plugin-existing-s3
- serverless-dotenv-plugin
- serverless-webpack
- serverless-offline
resources:
- Resources: ${file(./config/dynamoDB.yaml)}
- Resources: ${file(./config/lambdaRoles.yaml)}
functions:
hello:
handler: handler.hello

Add WebSocket Lambda:

We need to create a lambda function that accepts WebSocket events from API Gateway. As you can see, we’ve defined 3 WebSocket events for the lambda function.

  • $connect
  • $disconnect
  • $default

These 3 events stand for the default routes that come with WebSocket API Gateway offering. $connect and $disconnect are used for initialization and termination of the socket connection, where $default route is for data transfer.

functions:
websocket:
handler: lambda/websocket.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default

We can go ahead and update how data is sent and add custom WebSocket routes to the application.

The lambda needs to establish a connection with the client and handle the subscriptions. The logic for updating the DynamoDB is written in a utility class client. Whenever a connection is received, we create a record in the topics table.

console.log(`Received socket connectionId: ${event.requestContext && event.requestContext.connectionId}`);
if (!(event.requestContext && event.requestContext.connectionId)) {
throw new Error('Invalid event. Missing `connectionId` parameter.');
}
const connectionId = event.requestContext.connectionId;
const route = event.requestContext.routeKey;
console.log(`data from ${connectionId} ${event.body}`);
const connection = new Client(connectionId);
const response = { statusCode: 200, body: '' };
if (route === '$connect') {
console.log(`Route ${route} - Socket connectionId connectedconected: ${event.requestContext && event.requestContext.connectionId}`);
await new Client(connectionId).connect();
return response;
}

The Client utility class internally creates a record for the given connectionId in the DynamoDB topics table.

async subscribe({ topic, ttl }) {
return dynamoDBClient
.put({
Item: {
topic,
connectionId: this.connectionId,
ttl: typeof ttl === 'number' ? ttl : Math.floor(Date.now() / 1000) + 60 * 60 * 2,
},
TableName: process.env.TOPICS_TABLE,
}).promise();
}

Similarly, for the $disconnect route, we remove the INITIAL_CONNECTION topic record when a client disconnects.

else if (route === '$disconnect') {
console.log(`Route ${route} - Socket disconnected: ${ event.requestContext.connectionId}`);
await new Client(connectionId).unsubscribe();
return response;
}

The client.unsubscribe method internally removes the connection entry from the DynamoDB table. Here, the getTopics method fetches all the topics the particular client has subscribed to.

async unsubscribe() {
const topics = await this.getTopics();
if (!topics) {
throw Error(`Topics got undefined`);
}
return this.removeTopics({
[process.env.TOPICS_TABLE]: topics.map(({ topic, connectionId }) => ({
DeleteRequest: { Key: { topic, connectionId } },
})),
});
}

Now comes the default route part of the lambda where we customize message handling. In this implementation, we’re relaying our message handling based on the event.body.type, which indicates what kind of message is received from the client to server. The subscribe type here is used to subscribe to new topics. Similarly, the message type is used to receive the message from one client and then publish it to other clients who have subscribed to the same topic as the sender.

console.log(`Route ${route} - data from ${connectionId}`);
if (!event.body) {
return response;
}
let body = JSON.parse(event.body);
const topic = body.topic;
if (body.type === 'subscribe') {
connection.subscribe({ topic });
console.log(`Client subscribing for topic: ${topic}`);
}
if (body.type === 'message') {
await new Topic(topic).publishMessage({ data: body.message });
console.error(`Published messages to subscribers`);
return response;
}
return response;

Similar to $connect, the subscribe type of payload, when received, creates a new subscription for the mentioned topic.

Publishing the messages

Here is the interesting part of this lambda. When a client sends a payload with type message, the lambda calls the publishMessage method with the data received. The method gets the subscribers active for the topic and publishes messages using another utility TopicSubscriber.sendMessage

async publishMessage(data) {
const subscribers = await this.getSubscribers();
const promises = subscribers.map(async ({ connectionId, subscriptionId }) => {
const TopicSubscriber = new Client(connectionId);
const res = await TopicSubscriber.sendMessage({
id: subscriptionId,
payload: { data },
type: 'data',
});
return res;
});
return Promise.all(promises);
}

The sendMessage executes the API endpoint, which is the API Gateway URL after deployment. As we’re using serverless-offline for the local development, the IS_OFFLINE env variable is automatically set.

const endpoint = process.env.IS_OFFLINE ? 'http://localhost:3001' : process.env.PUBLISH_ENDPOINT;
console.log('publish endpoint', endpoint);
const gatewayClient = new ApiGatewayManagementApi({
apiVersion: '2018-11-29',
credentials: config,
endpoint,
});
return gatewayClient
.postToConnection({
ConnectionId: this.connectionId,
Data: JSON.stringify(message),
})
.promise();

Instead of manually invoking the API endpoint, we can also use DynamoDB streams to trigger a lambda asynchronously and publish messages to topics.

Implementing the client

For testing the socket implementation, we will be using a node.js script ws-client.js. This creates two nodejs ws clients: one that sends the data and another that receives it.

const WebSocket = require('ws');
const sockedEndpoint = 'http://0.0.0.0:3001';
const ws1 = new WebSocket(sockedEndpoint, {
perMessageDeflate: false
});
const ws2 = new WebSocket(sockedEndpoint, {
perMessageDeflate: false
});

The first client on connect sends the data at an interval of one second to a topic named general. The count increments each send.

ws1.on('open', () => {
console.log('WS1 connected');
let count = 0;
setInterval(() => {
const data = {
type: 'message',
message: `count is ${count}`,
topic: 'general'
}
const message = JSON.stringify(data);
ws1.send(message, (err) => {
if(err) {
console.log(`Error occurred while send data ${err.message}`)
}
console.log(`WS1 OUT ${message}`);
})
count++;
}, 15000)
})

The second client on connect will first subscribe to the general topic and then attach a handler for receiving data.

ws2.on('open', () => {
console.log('WS2 connected');
const data = {
type: 'subscribe',
topic: 'general'
}
ws2.send(JSON.stringify(data), (err) => {
if(err) {
console.log(`Error occurred while send data ${err.message}`)
}
})
});
ws2.on('message', ( message) => {
console.log(`ws2 IN ${message}`);
});

Once the service is running, we should be able to see the following output, where the two clients successfully sharing and receiving the messages with our socket server.

Conclusion

With API Gateway WebSocket support and DynamoDB, we’re able to implement persistent socket connections using serverless functions. The implementation can be improved and can be as complex as needed.

WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings