Using Serverless Storage-First Pattern to ship analytics events between 3rd party providers

Using Serverless Storage-First Pattern to ship analytics events between 3rd party providers

ยท

8 min read

Featured on Hashnode

Background

I have a client that uses mParticle's customer data platform (CDP) to ingest analytics events from various sources including a mobile app, a web app and a cloud-based backend, as well as events from 3rd party systems like Branch (mobile app install attribution) or Algolia (search item click conversion).

You can use mParticle to set up connections to downstream 3rd party systems which would then receive all the ingested analytics events in near real-time. For example, you could forward events to Mixpanel for in-depth user/feature analysis, or Intercom to then be able to message users based on their in-app activity.

In the world of mParticle, analytics events are broken down into categories like screen navigation events, e-commerce/commercial events, or "custom" events that are more specific to your domain. mParticle supports forwarding most event types to most providers, but some support is still lacking.

The client recently integrated Iterable as a customer engagement solution, but unfortunately, Iterable only supports receiving custom events from mParticle, so screen/navigation events and "lifecycle" events like a mobile app "Session Start" event, were missing after the mParticle/Iterable connection was configured.

The marketing team very much expressed interest in being able to target users for certain messaging, based on their session behaviour and screen journeys.

We realized that we would need a custom solution to effectively ship analytics events from mParticle to Iterable that weren't catered for automatically by the configured connection.

Our first thought was to use a webhook API from mParticle, with a lambda invocation to forward the event batch to Iterable using the Iterable API. However, given the marketing team's requirements, the data flow didn't need to be real-time. Something like once per hour was likely good enough. In addition, the webhook API didn't need any kind of sophisticated response, but rather it was just "delivering" an analytics event batch with a relatively well-understood schema. Finally, we knew that the API would be called very frequently as events streamed in, and this number would increase as the user activity on the platform scaled up.

The Storage-First Pattern

Using the Storage First pattern you can capture the payload of incoming web requests and save them straight to SQS, SNS, EventBridge, Kinesis or a host of other AWS Services. This is viable when you don't need a lambda to perform more complex authentication/authorization, parsing, transformation and/or saving of the payload. Using the pattern in the correct circumstances allows you to reduce the latency of the response (no lambda cold start or processing time), lower cost (no lambda invocations), and also facilitates the ease-of-retry that you get with lambdas with an async event source should any processing fail.

Our need to batch up incoming analytics events and send them on to another 3rd party fits this use case. We can use a API Gateway SQS service integration, store the event batches in an SQS queue, and use a lambda on a cron job EventBridge recurring schedule ๐Ÿ˜Ž to invoke the lambda. It would consume all stored messages and call the Iterable trackBulk API. We did observe that event payloads from mParticle were typically in the 5-10KB range so well within the 256KB SQS message size limit. This configuration would also avoid the long-polling of the queue by the lambda, which greatly increases the number of requests to the queue (remember SQS is billed by the number of requests). This configuration would result in only a handful of SQS requests every hour (the number of event batches/messages in the queue divided by 10 messages per batch) and so would be very cost-effective indeed!

So, once more unto the breach!

First, we provision our queue.

// New SQS queue
    const sqsQueue = new sqs.Queue(this, `${id}-events-queue`, {
      queueName: `${id}-queue`,
      visibilityTimeout: Duration.seconds(30),
      retentionPeriod: Duration.days(4),
      receiveMessageWaitTime: Duration.seconds(0),
      deadLetterQueue: {
        queue: new sqs.Queue(this, `${id}-events-dlq`),
        maxReceiveCount: 3,
      },
    });
  • The visibilityTimeout should be the same timeout duration that we set for our consuming lambda. Here we've gone for 30 seconds, which should be plenty of time to process an hour's worth of analytics events at current scale. We'd need to keep an eye on the average duration of the lambda as our analytics traffic scales.

  • The retentionPeriod is set at 4 days (which is the default) and should be plenty of time to triage the lambda should it stop working, while not losing any analytics events.

  • The receiveMessageWaitTime parameter defaults to 0, so no actual need to set it here, but noting here as this disables any wait time or long-polling on any ReceiveMessage calls, reducing our "SQS requests" billable amount.

  • We provision a simple dead-letter queue. This would allow us to manually retry the processing of events at a later time, should we need to (for example, if the Iterable API went down for a prolonged period).

Now, our Gateway API, Rest resource and some related IAM privileges.

// Rest API
    const restApi = new apigw.RestApi(this, `${id}-gateway`, {
      restApiName: `${id}-gateway`,
      description: "API Gateway for mParticle Iterable API Proxy",
      deployOptions: {
        stageName: "dev",
      },
    });

// You likely want to add an authentication mechanism of some sort to your API
    // You can usually configure your 3rd party to use this API key in the webhook configuration
    restApi.addApiKey("APIKey", {
      apiKeyName: "APIKey",
      value: "2526a244-5766-4eca-aced-65643b080867",
    });

    // Event batch Rest API resource
    const eventBatchResource = restApi.root.addResource("event-batch");

    // Yes it's IAM again :-)
    const gatewayServiceRole = new iam.Role(this, "api-gateway-role", {
      assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com"),
    });

    // This allows API Gateway to send our event body to our specific queue
    gatewayServiceRole.addToPolicy(
      new iam.PolicyStatement({
        resources: [sqsQueue.queueArn],
        actions: ["sqs:SendMessage"],
      })
    );

Ok, now that we have our Rest resource provisioned at /event-batch , let's add our SQS proxy integration.

// A request template that tells API Gateway what action (SendMessage) to apply to what part of the payload (Body)
    const requestTemplate =
      'Action=SendMessage&MessageBody=$util.urlEncode("$input.body")';

    const AWS_ACCOUNT_ID = "YOUR_AWS_ACCOUNT_ID";
    const awsIntegrationProps: apigw.AwsIntegrationProps = {
      service: "sqs",
      integrationHttpMethod: "POST",
      // Path is where we specify the sqs queue to send to
      path: `${AWS_ACCOUNT_ID}/${sqsQueue.queueName}`,
      options: {
        passthroughBehavior: apigw.PassthroughBehavior.NEVER,
        credentialsRole: gatewayServiceRole,
        requestParameters: {
        // API Gateway needs to send messages to SQS using content type form-urlencoded
          "integration.request.header.Content-Type": 'application/x-www-form-urlencoded',
        },
        requestTemplates: {
          "application/json": requestTemplate,
        },
        integrationResponses: [
          {
            statusCode: "200",
          },
          {
            statusCode: "500",
            responseTemplates: {
              "text/html": "Error",
            },
            selectionPattern: "500",
          },
        ],
      },
    };

// Add the Rest API Method, along with the integration, 
// Also creating the required 200 Method Response 
eventBatchResource.addMethod("POST", new apigw.AwsIntegration(awsIntegrationProps),
{ methodResponses: [{ statusCode: "200" }] });

Ok great! Now with a simple cdk deploy we have our Gateway API, our Rest API & SQS service integration, and our SQS queue itself. A simple test with curl:

curl --location --request POST 'https://{YOUR_API_ID}.execute-api.us-east-1.amazonaws.com/dev/event-batch' \
--header 'Content-Type: application/json' \
--header 'X-API-Key: 2526a244-5766-4eca-aced-65643b080867' \
--data-raw '{ "testEventData": "test" }'

And we should be able to see our message in the queue:

We then configured an mParticle connection to our webhook API, and started receiving our analytics event batches. All that was left was to provision our event forwarder lambda, and associated EventBridge recurring rule to invoke it.

// event forwarder lambda
    const forwarder = new NodejsFunction(this, `${id}-event-forwarder`, {
      runtime: Runtime.NODEJS_16_X,
      functionName: `${id}-event-forwarder`,
      entry: "src/functions/forwarder/forwarder.ts",
      handler: "handler",
      memorySize: 512,
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
      environment: {
        SQS_QUEUE_URL: sqsQueue.queueUrl,
      },
      initialPolicy: [
        new PolicyStatement({
          actions: ["sqs:ReceiveMessage", "sqs:DeleteMessageBatch"],
          resources: [sqsQueue.queueArn],
        }),
      ],
    });

    const lambdaTarget = new targets.LambdaFunction(forwarder);
    new events.Rule(this, "ForwarderScheduleRule", {
      description: "Forward all stored mParticle events to Iterable every hour",
      schedule: events.Schedule.rate(Duration.hours(1)),
      targets: [lambdaTarget],
      // omitting the `eventBus` property puts the rule on the default event bus
    });

And now the lambda itself:

import { EventBridgeEvent } from "aws-lambda";
import { ReceiveMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import axios from "axios";

type ScheduledEvent = {};

const sqsClient = new SQSClient({ region: "us-east-1" });
const iterableClient = axios.create({
  baseURL: "https://api.iterable.com/api",
  headers: {
    "Api-Key": "YOUR_ITERABLE_API_KEY",
    "Content-Type": "application/json",
  },
});

export const handler = async (_event: EventBridgeEvent<ScheduledEvent>) => {
  const receiveMessageCommand = new ReceiveMessageCommand({
    QueueUrl: process.env.SQS_QUEUE_URL,
    MaxNumberOfMessages: 10,
    WaitTimeSeconds: 0,
  });

  let sqsResponse = await sqsClient.send(receiveMessageCommand);
  while (sqsResponse.Messages && sqsResponse.Messages?.length > 0) {
    const iterablePayload = mapEventsToIterablePayload(sqsResponse.Messages);

    // https://api.iterable.com/api/docs#events_trackBulk
    await iterableClient.post("/events/trackBulk", iterablePayload);

    const deleteMessageBatchCommand = new DeleteMessageBatchCommand({
      QueueUrl: process.env.SQS_QUEUE_URL,
      Entries: sqsResponse.Messages.map((message) => ({
        Id: message.MessageId,
        ReceiptHandle: message.ReceiptHandle,
      })),
    });
    await sqsClient.send(deleteMessageBatchCommand);

    sqsResponse = await sqsClient.send(receiveMessageCommand);
  }
};
  • We set the MaxNumberOfMessages to 10, which is the max, to maximize throughput of the lambda and minimize the number of `ReceiveMessage` calls we'll need to make.

  • The WaitTimeSeconds is arguably duplicative of our cdk queue's receiveMessageWaitTime as it performs the same function, but from the SDK's side. Take your pick!

  • The mapEventsToIterablePayload is unimplemented here but this is of course very domain specific. The output of that function is a json object in a schema that is accepted by the Iterable API.

  • Once we get a successful response from Iterable, we batch delete those now-processed messages from the queue, and then read the next batch.

Once all up and running we saw our analytics events arrive in Iterable over the course of the day, with minimal Lambda & SQS costs.

What about idempotency or partial failure?

Great question, as well-behaving lambdas should take this into account. In this case, we get a free pass as all the analytics events have a messageId. If Iterable receives multiple events with the same ID, it does not create a new event but rather overwrites the existing event. As events are effectively immutable, this is a safe operation. So if some of our messages were to be reprocessed, it wouldn't be the end of the world.

Thanks for reading! See also

Full code for this solution is on Github:

https://github.com/markgibaud/mparticle-iterable-api-proxy-sqs-integration

Jeremy Daly introduces the Storage-First pattern way back in this 2020 blog post:

https://www.jeremydaly.com/the-storage-first-pattern/

More recently, Robert Bulmer deploys a similar pattern but using S3:

https://awstip.com/storage-first-pattern-in-aws-with-api-gateway-part-1-using-s3-216e20b08353

Also check out the AWS Solutions Construct CDK template for a much more fully-baked solution:

https://docs.aws.amazon.com/solutions/latest/constructs/aws-apigateway-sqs.html

ย