Send emails with Azure Communication Services - Part 3

2024-08-30

Introduction

This is a third part of how to send emails via Azure Communication Services.

In the first part we covered setting up Email Communication Service, domain and sending test email. In the second part we covered how to send emails from NodeJS via Azure SDK, how to create optimized HTML content and how to handle Unsubscribe.

In this part we'll cover how to schedule emails, and avoid sending email to unsubscribed users.

Rate limiting emails

As we mentioned before, Azure has a limit of 30 emails per minute and 100 per hour. But traffic might come in burst, so you might want to have some queue on the backend to smoothen it.

I've been googling for some open source solution to handle this;

There are plenty of them like Bull and Kue, but all are based on Redis.

I currently run NodeJS as managed node service on Azure, and I have managed MySQL, I didn't want to add Redis to the mix.

Using MySQL as queue

I created a simple table in MySQL to store "tasks" that needs to be sent.

task for now is only email, but it can be anything.

CREATE TABLE IF NOT EXISTS tasks (
  id INT AUTO_INCREMENT PRIMARY KEY,
  status ENUM('pending', 'in-progress', 'completed') NOT NULL DEFAULT 'pending',
  pulled_at DATETIME DEFAULT NULL,
  created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
  scheduled_at DATETIME DEFAULT CURRENT_TIMESTAMP,
  details JSON NOT NULL
)

the table is created on app start:

(async () => {
    const connection = await pool.getConnection();
    try {
        await connection.query(`
            ....
        `);
        console.log(`Table tasks created successfully or already exists.`);
    } catch (err) {
        console.error('Error creating table:', err);
    } finally {
        connection.release();
    }
})();

there is a function to pull for tasks (which we'll cover later), and a function to add task:

async function pullTask() {
  ...
}

export async function addTask(details, scheduledAt = null) {
    if (scheduledAt) {
        await pool.query(`INSERT INTO ${tableName} (details, scheduled_at) VALUES (?, ?)`, [JSON.stringify(details), scheduledAt]);
        return;
    }
    await pool.query(`INSERT INTO ${tableName} (details) VALUES (?)`, [JSON.stringify(details)]);
}

pull task is scheduled to run every few seconds.

setInterval(pullTask, 2000);

Pulling task

The idea is to use select for update from MySQL to lock the row the current worker is working on.

It's needed in case there are 2 nodes running at the same time. Which can happen in case of managed node service.

async function pullTask() {
    const connection = await pool.getConnection();
    try {
      // Start transaction
      await connection.beginTransaction();
  
      // Step 1: Count tasks pulled in the last minute
      const [recentTasks] = await connection.query(
        `SELECT COUNT(*) as count FROM ${tableName} WHERE pulled_at > NOW() - INTERVAL 1 MINUTE`
      );
  
      const pulledCount = recentTasks[0].count;
  
      if (pulledCount >= limitPerMinute) {
        console.log('Task limit reached. Try again later.');
        await connection.rollback();
        return;
      }
  
      // Step 2: Find the next available task that has not been pulled yet
      const [tasks] = await connection.query(
        `SELECT * FROM ${tableName} WHERE scheduled_at <= NOW() AND status = '${status.PENDING}' AND (pulled_at IS NULL OR pulled_at <= NOW() - INTERVAL 1 MINUTE) LIMIT 1 FOR UPDATE`
      );
  
      if (tasks.length === 0) {
        console.log('No tasks available for processing.');
        await connection.rollback();
        return;
      }
  
      const task = tasks[0];
  
      // Step 3: Update the task to mark it as pulled
      await connection.query(`UPDATE ${tableName} SET status = '${status.IN_PROGRESS}', pulled_at = NOW() WHERE id = ?`, [task.id]);
  
      // **Your business logic goes here:**
      console.log(`Processing task ID: ${task.id}`);
      await runTask(task);
      
      // Perform your business operations on the task...
  
      // Step 4: After processing, mark task as completed or other appropriate status
      await connection.query(`UPDATE ${tableName} SET status = '${status.COMPLETED}' WHERE id = ?`, [task.id]);
  
      // Commit the transaction
      await connection.commit();
      console.log(`Task ID: ${task.id} processed successfully.`);
    } catch (err) {
      console.error('Error pulling task:', err);
      await connection.rollback();
    } finally {
      connection.release();
    }
  }

Sending email from task

async function runTask(task) {
    console.log("running task %j", task);
    const details = task.details;
    console.log("details %j", details);

    if (details.type === 'email') {
        console.log("sending email to %s", details.email);
        const website = details.website;
        const template = details.template;
        try {
            const isSubscribed = await isSubscribed(details.email, details.website);
            if (!isSubscribed) {
                console.log("User is not subscribed. Skipping email.");
                return;
            }
            await sendEmail(details.email, template);
            console.log("Email sent successfully.");
        } catch (e) {
            console.error("Error sending email", e);
        }
    }
}

When task type is "email" I expect details to have the following shape:

{
    "type": "email",
    "email": "...customer email....",
    "website": "...website name e.g devtoolsdaily... ",
    "template": " .... template name ... "
}

Templates are just hardcoded, and it was described in the Part 2 how to create optimized HTML content.

Scheduling emails

I added scheduled_at column, so emails can be scheduled for the future.

For example, If I need to send a newsletter. I can schedule them to be sent at 8am on Mondays, and the queue will spread it over time.