All about Cloud, mostly about Amazon Web Services (AWS)

Preventing Kinesis Firehose Data Exfiltration

 2020-02-08 /  1349 words /  7 minutes

Amazon Web Services provides some great capabilities. Amazon Kinesis Firehose is one example. Firehose makes it really easy to take data from an Amazon Kinesis Streams and copy it to either an Amazon Simple Storage Service (S3) bucket, an Amazon Redshift database, an Amazon Elasticsearch Service cluster, or Splunk. This ability however can expose a company to data exfiltration risk. A rogue employee or a hacker could create a Firehose and send data to a resource under their control, so it’s important to consider preventing Kinesis Firehose data exfiltration.

Preventing Kinesis Firehose Data Exfiltration

Tools like AWS Identity and Access Management (IAM) policies can be used to prevent movement to resources based on their account number, or even AWS Organizations number, but an S3 bucket is particularly difficult to lock down because the Amazon Resource Name (ARN) for an S3 bucket does not contain the account number.

One solution is to use Amazon CloudWatch Events in conjunction with AWS Lambda. A Lambda function can be triggered when CloudWatch Events sees that a Kinesis Data Firehose is created. The Lambda function can then check to see if the S3 bucket belongs to this account, and if so, it can delete the Firehose. Although in a complex organization we may want to allow cross account access to S3 buckets in other accounts within the organization, in this example, we’re restricting the validation to the local account.

Setting up for the Solution

There’s one other complication and we also need a couple of pre-requisites. First, we cannot delete a Firehose while it is still in the ‘CREATING’ state, so there needs to be some monitoring. Second, we need an AWS IAM role allowing the Lambda function to call AWS services. When creating the role, we always want to follow the Principle of Least Priviledge. The Lambda function needs to read the list of S3 buckets, check the status of the Firehose and delete it, and perhaps surprisingly, the ability to write to CloudWatch Logs, so we can capture log output from our Lambda function.

First we need an assume role policy document:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      }
    }
  ]
}

This file can be saved as lambda-assume-role-policy-document.json. Then, we create the role:

$ aws iam create-role --role-name kfhchk-role --assume-role-policy-document file://lambda-assume-role-policy-document.json
{
  "Role": {
    "AssumeRolePolicyDocument": {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          }
        }
      ]
    },
    "RoleId": "AROASXMJAC2FVWAMP5RBK",
    "CreateDate": "2020-02-09T01:07:52Z",
    "RoleName": "kfhchk-role",
    "Path": "/",
    "Arn": "arn:aws:iam::012345678901:role/kfhchk-role"
  }
}

We also need to create a policy. Here’s the policy document which contains the bare minimum access required for the Lambda function to work:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListAllMyBuckets",
                "firehose:DeleteDeliveryStream",
                "firehose:DescribeDeliveryStream"
            ],
            "Resource": [ "*"
            ]
        }
    ]
}

Save this as policy-document.json, and then run the command:

$ aws iam create-policy --policy-name kfhchk-policy --policy-document file://policy-document.json
{
    "Policy": {
        "PolicyName": "kfhchk-policy",
        "PermissionsBoundaryUsageCount": 0,
        "CreateDate": "2020-02-09T01:46:58Z",
        "AttachmentCount": 0,
        "IsAttachable": true,
        "PolicyId": "ANPASXMJAC2F2JMGR5IAI",
        "DefaultVersionId": "v1",
        "Path": "/",
        "Arn": "arn:aws:iam::012345678901:policy/kfhchk-policy",
        "UpdateDate": "2020-02-09T01:46:58Z"
    }
}

Finally, we need to tie the policy to the role. Run this command, substituting the correct account number:

aws iam attach-role-policy --role-name kfhchk-role --policy-arn arn:aws:iam::012345678901:policy/kfhchk-policy

The Lambda Function

The Lambda function is listed below. We’ll break it down into sections to explain what’s going on.

At the beginning of every Go program is a package declaration. Then, we import dependencies. There are some dependencies which are part of Go itself, and some which are provided by AWS. Finally, we define a variable which is used for debugging. It basically forced deletion of the Kinesis Firehose even if the S3 buckets was found.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/firehose"
	"github.com/aws/aws-sdk-go/service/s3"
)

var alwaysDelete = false

Next we have the first function, bucketExists. It basically makes a connection to S3, gets a list of the buckets, and then checks to see if the bucket ARN passed in as a parameter appears in the list.

func bucketExists(bucketARN string) bool {

	rc := false

	// We can't use HeadBucket here. We need to know we own the bucket
	svc := s3.New(session.New())
	input := &s3.ListBucketsInput{}

	result, err := svc.ListBuckets(input)
	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			default:
				fmt.Println(aerr.Error())
			}
		} else {
			// Print the error, cast err to awserr.Error to get the Code and
			// Message from an error.
			fmt.Println(err.Error())
		}
	} else {
		// fmt.Printf("S3 ListBuckets Result: %+v\n", result)
		for index, element := range result.Buckets {
			bucketName := *(element.Name)
			bucketNameSuffix := ":" + bucketName
			if strings.HasSuffix(bucketARN, bucketNameSuffix) {
				fmt.Println("FOUND", index, "=>", bucketName)
				rc = true
				break
			} else {
				fmt.Println("MISSED", index, "=>", bucketName)
			}
		}
	}

	return rc
}

The second function deletes the Firehose. It expects that the Firehose is in a state where it can be deleted.

func deleteKinesisFirehose(firehoseArn string) {
	fmt.Printf("Firehose to delete: %+v\n", firehoseArn)

	idx := strings.LastIndex(firehoseArn, "/")
	firehoseName := firehoseArn[idx+1:]
	fmt.Printf("Firehose stream name: %+v\n", firehoseName)

	deleteDeliveryStreamInput := &firehose.DeleteDeliveryStreamInput{}
	deleteDeliveryStreamInput.SetDeliveryStreamName(firehoseName)

	svc := firehose.New(session.New())
	deleteDeliveryStreamOutput, err := svc.DeleteDeliveryStream(deleteDeliveryStreamInput)

	if err != nil {
		fmt.Printf("Error deleting stream: %+v\n ", err)
	} else {
		fmt.Printf("Deleting stream: %+v\n", deleteDeliveryStreamOutput)
	}
}

The third method, waitForNotCreating, ensures that the Firehose is in a state capable of being deleted.

func waitForNotCreating(firehoseArn string) {

	svc := firehose.New(session.New())

	idx := strings.LastIndex(firehoseArn, "/")
	firehoseName := firehoseArn[idx+1:]
	fmt.Printf("Firehose stream name: %+v\n", firehoseName)

	describeDeliveryStreamInput := &firehose.DescribeDeliveryStreamInput{}
	describeDeliveryStreamInput.DeliveryStreamName = &firehoseName

	for {
		describeDeliveryStreamOutput, err := svc.DescribeDeliveryStream(describeDeliveryStreamInput)
		if err != nil {
			fmt.Printf("Error describing stream: %+v\n ", err)
		} else {
			fmt.Printf("Stream: %+v\n", describeDeliveryStreamOutput)
		}

		if *describeDeliveryStreamOutput.DeliveryStreamDescription.DeliveryStreamStatus != "CREATING" {
			break
		}

		time.Sleep(100 * time.Millisecond)
	}
}

Finally, we have the Lambda entry point. This method decomposes the CloudWatch event to pull out the eventSource, eventName and firehoseArn. It then calls bucketExists and if it wasn’t found (or alwaysDelete is true) then it calls waitForNotCreating and after that, deleteKinesisFirehose.

func HandleRequest(ctx context.Context, event events.CloudWatchEvent) (string, error) {

	fmt.Printf("Context: %+v\n", ctx)
	fmt.Printf("Event: %+v\n", event)
	fmt.Printf("RawDetails: %s\n", event.Detail)

	var detail = new(map[string]interface{})
	json.Unmarshal(event.Detail, detail)
	eventSource := (*detail)["eventSource"]
	eventName := (*detail)["eventName"]

	var responseElements map[string]interface{}
	responseElements = (*detail)["responseElements"].(map[string]interface{})
	fmt.Printf("    response: %+v\n", responseElements)

	firehoseArn := responseElements["deliveryStreamARN"].(string)
	fmt.Printf("    firehoseARN: %+v\n", firehoseArn)

	fmt.Printf("Details: %+v\n", *detail)
	fmt.Printf("    eventSource: %+v\n", eventSource)
	fmt.Printf("    eventName: %+v\n", eventName)

	if eventSource == "firehose.amazonaws.com" && eventName == "CreateDeliveryStream" {
		var requestParams map[string]interface{}
		requestParams = (*detail)["requestParameters"].(map[string]interface{})

		var extendedS3DestinationConfiguration map[string]interface{}
		extendedS3DestinationConfiguration = requestParams["extendedS3DestinationConfiguration"].(map[string]interface{})

		bucketARN := extendedS3DestinationConfiguration["bucketARN"].(string)
		fmt.Printf("    bucketARN: %+v\n", bucketARN)

		if !bucketExists(bucketARN) || alwaysDelete {

			waitForNotCreating(firehoseArn)
			deleteKinesisFirehose(firehoseArn)
		}

	} else {
		fmt.Printf("Skipping...\n")
	}

	return fmt.Sprintf("%+v", event), nil
}

Last, a standard function.

func main() {
	fmt.Printf("main() started")
	lambda.Start(HandleRequest)
	fmt.Printf("main() completed")
}

Deploying the Solution

We must build the Go program so it can be used in a Lambda function. On a Mac, we need to explicitly build for a Linux system, output the program as main, and zip it up. We then create the Lambda function.

$ GOOS=linux go build -o main
$ zip -v deployment.zip main
updating: main .	(in=17139676) (out=7429168) (deflated 57%)
total bytes=17139676, compressed=7429168 -> 57% savings
$ aws --region us-east-1 lambda create-function --function-name kfhchk --zip-file fileb://./deployment.zip --runtime go1.x --role arn:aws:iam::012345678901:role/kfhchk-role --handler main
{
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "CodeSha256": "T9DRFv1YQse8SUOxUMSfiGk/GUsi/vxg0si3+6WgRSY=",
    "FunctionName": "kfhchk",
    "CodeSize": 7429326,
    "RevisionId": "98a117c1-5b8a-46b6-94af-470c59afe35c",
    "MemorySize": 128,
    "FunctionArn": "arn:aws:lambda:us-east-1:012345678901:function:kfhchk",
    "Version": "$LATEST",
    "Role": "arn:aws:iam::012345678901:role/kfhchk-role",
    "Timeout": 3,
    "LastModified": "2020-02-09T02:30:59.363+0000",
    "Handler": "main",
    "Runtime": "go1.x",
    "Description": ""
}

We then create a CloudWatch Events Rule, and attach the Lambda function to the rule.

$ aws events put-rule --name "kfhchk-rule" --event-pattern "{\"source\":[\"aws.firehose\"]}"
{
    "RuleArn": "arn:aws:events:us-east-1:012345678901:rule/kfhchk-rule"
}
$ aws events put-targets --rule kfhchk-rule --targets "Id"="1","Arn"="arn:aws:lambda:us-east-1:012345678901:function:kfhchk"
{
    "FailedEntries": [],
    "FailedEntryCount": 0
}

We can check the results again using the AWS Command Line Interface (CLI):

aws events list-rules
{
    "Rules": [
        {
            "EventPattern": "{\"source\":[\"aws.firehose\"]}",
            "State": "ENABLED",
            "Name": "kfhchk-rule",
            "Arn": "arn:aws:events:us-east-1:187655263883:rule/kfhchk-rule"
        }
    ]
}
$ aws events list-targets-by-rule --rule kfhchk-rule
{
    "Targets": [
        {
            "Id": "1",
            "Arn": "arn:aws:lambda:us-east-1:187655263883:function:kfhchk"
        }
    ]
}

Conclusions

After everything is deployed, whenever CloudWatch Events sees anything associated with the Kinesis Data Firehose service it will invoke the Lambda function. Using this approach, we are preventing Kinesis Firehose data exfiltration.


Tags:  Kinesis  Firehose  Lambda  S3  CloudWatch  Redshift  Elasticsearch  Splunk  Go  GoLang
Categories:  AWS  Amazon Kinesis  Amazon Kinesis Firehose  Amazon Kinesis Data Firehose  AWS Lambda  Amazon Simple Storage Service (S3)  Amazon CloudWatch  Amazon Redshift  Amazon Elasticsearch Service

See Also

 Top Ten Tags

AWS (43)   Kinesis (9)   Streams (8)   AWS Console (5)   Go (5)   Analytics (4)   Data (4)   database (4)   Amazon DynamoDB (3)   Amazon Elastic Compute Cloud (EC2) (3)  


All Tags (173)

Disclaimer

All data and information provided on this site is for informational purposes only. cloudninja.cloud makes no representations as to accuracy, completeness, currentness, suitability, or validity of any information on this site and will not be liable for any errors, omissions, or delays in this information or any losses, injuries, or damages arising from its display or use. All information is provided on an as-is basis.

This is a personal weblog. The opinions expressed here represent my own and not those of my employer. My opinions may change over time.