Code Monkey home page Code Monkey logo

datadog-serverless-functions's Introduction

datadog-serverless-functions

This repository contains our serverless functions that process streams and send data to datadog

datadog-serverless-functions's People

Contributors

achntrl avatar agocs avatar ajacquemot avatar ashwingandhi-ddog avatar ava-silver avatar claudiadadamo avatar czechh avatar daniellanger avatar darcyraynerdd avatar gaetan-deputier avatar ge0aja avatar hghotra avatar ian28223 avatar iotmani avatar jcstorms1 avatar jvanbrie avatar klivan avatar ktmq avatar le-he-hoo avatar nbparis avatar nhinsch avatar noisomepossum avatar parsons90 avatar raphaelallier avatar sabiurr avatar saleelsaptarshi-dd avatar sfirrin avatar tianchu avatar tmichelet avatar yasuharu519 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

datadog-serverless-functions's Issues

[Forwarder] Private link support (VPC configuration support through CF template)

https://docs.datadoghq.com/agent/guide/private-link/?tab=logs#client-configuration

The above documentation is a little misleading since the default cloudformation template doesn't seem to support enabling VPC/subnets/security group. It also can't be added by brute force unless you attach the missing AWSLambdaVPCAccessExecutionRole to the role.

I could be missing something, but it's not entirely obvious. Are you guys planning on adding these config parameters at some point? I can send send a PR, but this will introduce some pretty obnoxious branching/parameter collecting that you guys are going to able pump out a lot faster/cleaner.

AWS::Serverless::Function would need a branching forVpcConfig
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-vpcconfig.html

Also the execution role will need aws managed AWSLambdaVPCAccessExecutionRole

I got it working, but i'm just drifting the stack and brute forcing my way there, just not a long term solution.

Azure Blob triggered function fails blobContent.trim is not a function for WebApp Application and Server Diagnostic logs

Describe what happened:
Once a WebApp application or server diagnostic log has been added to the storage account the function errors with Exception: TypeError: blobContent.trim is not a function

2019-05-01T02:44:48  Welcome, you are now connected to log-streaming service.
2019-05-01T02:45:31.401 [Information] Executing 'Functions.sitecore_cd_server_and_application_logs' (Reason='New blob detected: logs/PROD-CD-APP/2019/05/01/02/efbf18.log', Id=4f01a2c1-a616-4351-8a0b-6806fea15632)
2019-05-01T02:45:32.270 [Error] Executed 'Functions.sitecore_cd_server_and_application_logs' (Failed, Id=4f01a2c1-a616-4351-8a0b-6806fea15632)
Result: Failure
Exception: TypeError: blobContent.trim is not a function
Stack: TypeError: blobContent.trim is not a function
    at module.exports (D:\home\site\wwwroot\sitecore_cd_server_and_application_logs\index.js:44:28)
    at WorkerChannel.invocationRequest (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:16077:26)
    at ClientDuplexStream.WorkerChannel.eventStream.on (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:15967:30)
    at ClientDuplexStream.emit (events.js:182:13)
    at addChunk (_stream_readable.js:283:12)
    at readableAddChunk (_stream_readable.js:264:11)
    at ClientDuplexStream.Readable.push (_stream_readable.js:219:10)
    at Object.onReceiveMessage (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19692:19)
    at InterceptingListener.module.exports.InterceptingListener.recvMessageWithContext (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19007:19)
    at D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19106:14
2019-05-01T02:45:32.701 [Information] Executing 'Functions.sitecore_cd_server_and_application_logs' (Reason='New blob detected: logs/PROD-CD-APP/2019/05/01/02/efbf18.log', Id=457e7a33-8df1-464c-9714-46eb843018a4)
2019-05-01T02:45:32.913 [Error] Executed 'Functions.sitecore_cd_server_and_application_logs' (Failed, Id=457e7a33-8df1-464c-9714-46eb843018a4)
Result: Failure
Exception: TypeError: blobContent.trim is not a function
Stack: TypeError: blobContent.trim is not a function
    at module.exports (D:\home\site\wwwroot\sitecore_cd_server_and_application_logs\index.js:44:28)
    at WorkerChannel.invocationRequest (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:16077:26)
    at ClientDuplexStream.WorkerChannel.eventStream.on (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:15967:30)
    at ClientDuplexStream.emit (events.js:182:13)
    at addChunk (_stream_readable.js:283:12)
    at readableAddChunk (_stream_readable.js:264:11)
    at ClientDuplexStream.Readable.push (_stream_readable.js:219:10)
    at Object.onReceiveMessage (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19692:19)
    at InterceptingListener.module.exports.InterceptingListener.recvMessageWithContext (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19007:19)
    at D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19106:14
2019-05-01T02:45:33.198 [Information] Executing 'Functions.sitecore_cd_server_and_application_logs' (Reason='New blob detected: logs/PROD-CD-APP/2019/05/01/02/efbf18.log', Id=e9a4ec0f-4d00-4f2a-bf77-57faafbbd5b3)
2019-05-01T02:45:33.571 [Error] Executed 'Functions.sitecore_cd_server_and_application_logs' (Failed, Id=e9a4ec0f-4d00-4f2a-bf77-57faafbbd5b3)
Result: Failure
Exception: TypeError: blobContent.trim is not a function
Stack: TypeError: blobContent.trim is not a function
    at module.exports (D:\home\site\wwwroot\sitecore_cd_server_and_application_logs\index.js:44:28)
    at WorkerChannel.invocationRequest (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:16077:26)
    at ClientDuplexStream.WorkerChannel.eventStream.on (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:15967:30)
    at ClientDuplexStream.emit (events.js:182:13)
    at addChunk (_stream_readable.js:283:12)
    at readableAddChunk (_stream_readable.js:264:11)
    at ClientDuplexStream.Readable.push (_stream_readable.js:219:10)
    at Object.onReceiveMessage (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19692:19)
    at InterceptingListener.module.exports.InterceptingListener.recvMessageWithContext (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19007:19)
    at D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19106:14
2019-05-01T02:45:34.633 [Information] Executing 'Functions.sitecore_cd_server_and_application_logs' (Reason='This function was programmatically called via the host APIs.', Id=8928a007-3185-43d0-917b-fa97561b3274)
2019-05-01T02:45:34.900 [Error] Executed 'Functions.sitecore_cd_server_and_application_logs' (Failed, Id=8928a007-3185-43d0-917b-fa97561b3274)
Result: Failure
Exception: TypeError: blobContent.trim is not a function
Stack: TypeError: blobContent.trim is not a function
    at module.exports (D:\home\site\wwwroot\sitecore_cd_server_and_application_logs\index.js:44:28)
    at WorkerChannel.invocationRequest (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:16077:26)
    at ClientDuplexStream.WorkerChannel.eventStream.on (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:15967:30)
    at ClientDuplexStream.emit (events.js:182:13)
    at addChunk (_stream_readable.js:283:12)
    at readableAddChunk (_stream_readable.js:264:11)
    at ClientDuplexStream.Readable.push (_stream_readable.js:219:10)
    at Object.onReceiveMessage (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19692:19)
    at InterceptingListener.module.exports.InterceptingListener.recvMessageWithContext (D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19007:19)
    at D:\Program Files (x86)\SiteExtensions\Functions\2.0.12427\32bit\workers\node\worker-bundle.js:19106:14
2019-05-01T02:46:48  No new trace in the past 1 min(s).

Describe what you expected:
That the newly detected file would be read and data sent to my Datadog account.

Steps to reproduce the issue:
Enable WebApp Diagnostic Logs to blob store. Create Datadog function app with blob trigger.

Pulling AWS CloudWatch logs from an S3 bucket.

Describe what happened:

At our company, we have a system that uses a Kinesis Data Stream to take logs from a CloudWatch group and dump them into an S3 bucket. It is from this S3 bucket that we want our logs to be sent to Datadog. An example file in our S3 bucket looks like this:

{
  "messageType":"DATA_MESSAGE",
  "owner":"***",
  "logGroup":"OverlapNetwork-VpcFlowLogs-191J53EMBBCRQ-LogGroup-AW0BZ8VQNM3V",
  "logStream":"eni-0b637efff203cbe0b-all",
  "subscriptionFilters":["****"],
  "logEvents": [
{"id":"35322448436314467953163414326907908605993876997091885056","timestamp":1583913368000,"message":"2 unknown eni-0b637efff203cbe0b - - - - - - - 1583913368 1583913475 - NODATA"}
  ]
}
{
 "messageType":"DATA_MESSAGE",
  "owner":"261154102564",
  "logGroup":"/ecs/*****/",
  "logStream":"datadog/datadog/a7e3f0a1-4b8a-49da-af64-a40a2e4ab21e",
  "subscriptionFilters":["****"],"
  logEvents":[
{"id":"35322454655657792645376249155738670693078760903753662464","timestamp":1583913646885,"message":"2020-03-11 08:00:46 UTC | TRACE | INFO | (pkg/trace/info/stats.go:108 in LogStats) | [lang:python lang_version:3.6.10 interpreter:CPython tracer_version:0.34.0] -> traces received: 12, traces filtered: 0, traces amount: 30824 bytes, events extracted: 0, events sampled: 0"}
  ]
}

Describe what you expected:

We expected that the Datadog Log Collection Lambda in this repo would be able to parse these objects like CloudWatch logs, just as if we had the Datadog Log Collection Lambda be triggered from CloudWatch directly.

Steps to reproduce the issue:

  1. Save CloudWatch logs to S3.
  2. Trigger the Datadog Log Collection Lambda upon new ObjectCreate events in the S3 bucket.
  3. View the logs in the Datadog Log console.

Tl:dr; we'd like to use Datadog for log analysis, and S3 (& glacier) for long-term long storage. We thought we could achieve both with this setup. Is there another way you'd recommend?

Thank you,

DataDog AWS Lambda setting `DD_TAGS=env:none`

# Default `env` to `none` and `service` to the function name,
# for correlation with the APM env and service.
metadata[DD_SERVICE] = function_name
metadata[DD_CUSTOM_TAGS] += ",env:none"

Why is the env tag set to none? There is no mechanism provided to set it to something else, and here its being concatted behind DD_TAGS effectively overwriting it.

metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
[
DD_TAGS,
",".join(
["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
),
],
)
)

What is the correct way to pass and 'environment' tag (e.g. prod)?

Python code is generating a DeprecationWarning

Describe what happened:
This Python code is generating a DeprecationWarning because it's relying on a vendored requests package.

https://github.com/DataDog/datadog-serverless-functions/blob/master/aws/logs_monitoring/lambda_function.py#L14

/opt/python/lib/python2.7/site-packages/botocore/vendored/requests/api.py:67: DeprecationWarning: You are using the get() function from 'botocore.vendored.requests'. This is not a public API in botocore and will be removed in the future. Additionally, this version of requests is out of date. We recommend you install the requests package, 'import requests' directly, and use the requests.get() fun
/opt/python/lib/python2.7/site-packages/botocore/vendored/requests/api.py:67: DeprecationWarning: You are using the get() function from 'botocore.vendored.requests'. This is not a public API in botocore and will be removed in the future. Additionally, this version of requests is out of date. We recommend you install the requests package, 'import requests' directly, and use the requests.get() function instead.

Describe what you expected:
As described in the above message this code should be using a non-vendored requests package or a different package for generating HTTP requests.

Steps to reproduce the issue:
It should be easily reproduced anytime this function is executed.

Lambda Log forwarder: Metadata is shared between different logs when using Kinesis as event_type

Describe what happened:
When Lambda logs arrive from Kinesis, each record in the event share the same metadata object as the others. This result in some logs getting tagged with multiple functionname.

Describe what you expected:
I expect each log to have the proper functionname tag

Steps to reproduce the issue:
Test the handler with one kinesis event containing multiple lambda logs. See that the resulting logs in Datadog have multiple functionnames.

Feature Request: Capture Tags off of resource.

For the AWS logging lambda, it would be awesome if it could capture the tags off of the AWS resource its logging for.

For example for logs of other lambdas, it could Describe* the lambda and get the tags, and then tag all of the logs with that. So if we have specific tags on the lambda resource, those also make it through to our logs.

Same goes for ALBs, ELBs, Cloudfront, etc.

I would expect this should be allowed to be disabled with an ENV var flag. Some folks may not like to add the Describe* permissions to their lambda.

[RDS] if lambda function is created in a different region from the KMS key => issue

With the Enhanced version of the RDS integration if the lambda function is created in a different region from the KMS key there is an issue.

It seems our lambda code automatically initializes a kms client for the default region (same region the lambda runs in).

So the user must create another key in the same region as the lambda function and regenerate the encrypted config.

Connection refused, ending in timeout lambda

Describe what happened:
We're monitoring all lambda errors in our account, and thus we monitor the datadog lambda (version 1.5.0) as well. We see reoccurring failures i.e. at least once a day.

Looking through the cloudwatch logs, this is what I see:

[Errno 111] Connection refused: error
Traceback (most recent call last):
File "/opt/python/lib/python2.7/site-packages/datadog_lambda/wrapper.py", line 58, in __call__
return self.func(event, context)
File "/var/task/lambda_function.py", line 380, in datadog_forwarder
forward_logs(filter_logs(logs))
File "/var/task/lambda_function.py", line 402, in forward_logs
with DatadogClient(cli) as client:
File "/var/task/lambda_function.py", line 193, in __enter__
self._client.__enter__()
File "/var/task/lambda_function.py", line 242, in __enter__
self._connect()
File "/var/task/lambda_function.py", line 215, in _connect
sock.connect((self.host, self.port))
File "/usr/lib64/python2.7/ssl.py", line 864, in connect
self._real_connect(addr, False)
File "/usr/lib64/python2.7/ssl.py", line 851, in _real_connect
socket.connect(self, addr)
File "/usr/lib64/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

Followed by a REPORT log entry saying the lambda consumed all the allocated time.

Describe what you expected:
I expect the lambda not to fail. Not at such a high rate anyway.

Steps to reproduce the issue:

Given the high failure rate, I presume this is smth that happens to everyone.

template.yaml fails cfn-lint check

Describe what happened:
It's common to deploy templates in a ci/cd pipeline, where templates are linted by cfn-lint.

Describe what you expected:
template.yaml to pass cfn-lint checks. Currently - the template.yaml yields the following cfn-lint warning and error:

[cfn-lint] W2510: Lambda Memory Size parameters should use MinValue, MaxValue or AllowedValues at Parameters/MemorySize
template.yaml:22:3

[cfn-lint] E3012: Property Resources/Forwarder/Properties/Environment/Variables/DD_ENHANCED_METRICS should be of type String
template.yaml:269:11

Steps to reproduce the issue:
Run

  • cfn-lint --template template.yaml

Suggested Resolution:

  • Add Max/Min value to parameter for current lambda boundaries, so the template passes link checks without errors/warnings.
  • Change DD_ENHANCED_METRICS to string

Getting error in lambda_function.phy

Describe what happened:
Creating new lambda function and trying to test, getting below error:

Response:
{
  "stackTrace": [
    [
      "/var/task/lambda_function.py",
      156,
      "lambda_handler",
      "\"You must configure your API key before starting this lambda function (see #Parameters section)\""
    ]
  ],
  "errorType": "Exception",
  "errorMessage": "You must configure your API key before starting this lambda function (see #Parameters section)"
}

If I remove below code and test, getting null in the response.

    if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "":
        raise Exception(
            "You must configure your API key before starting this lambda function (see #Parameters section)"
        )
    # Check if the API key is the correct number of characters
    if len(DD_API_KEY) != 32:
        raise Exception(
            "The API key is not the expected length. Please confirm that your API key is correct"
        )

Describe what you expected:
It should be executed successfully without any error.

Steps to reproduce the issue:
I have Configured same datadog API keys in below 2 lines.

  1. https://github.com/DataDog/datadog-serverless-functions/blob/master/aws/logs_monitoring/lambda_function.py#L46
  2. https://github.com/DataDog/datadog-serverless-functions/blob/master/aws/logs_monitoring/lambda_function.py#L154

Question:

  • I believe it is failing because I am passing the same datadog API key in both the lines. So, What would be the value of 'your_api_key'?

Thanks.

MariaDB logs negate rds_regex, no enrichment

Describe what happened:

After we hooked up our MariaDB Audit Log outputs for an RDS instance named "rateanalytics-mariadb-m01-live" to CloudWatch => Datadog, we expected to have some items automatically set such as the host being extracted from the Log Group name, and hoped to have a Datadog-built Pipeline activate for our ingested logs.

Instead, the source got set to "mariadb" and no Datadog-built pipeline activated for our logs. This is due to the presence of the "mariadb" string in our RDS instance name.

Describe what you expected:

Looking through the code, we expected the following code to run which would set the host tag:

# When parsing rds logs, use the cloudwatch log group name to derive the
# rds instance name, and add the log name of the stream ingested
if metadata[DD_SOURCE] == "rds":
match = rds_regex.match(logs["logGroup"])
if match is not None:
metadata[DD_HOST] = match.group("host")
metadata[DD_CUSTOM_TAGS] = (
metadata[DD_CUSTOM_TAGS] + ",logname:" + match.group("name")
)
# We can intuit the sourcecategory in some cases
if match.group("name") == "postgresql":
metadata[DD_CUSTOM_TAGS] + ",sourcecategory:" + match.group("name")

The reason for this is that "mariadb" is higher in the for-loop than "/aws/rds", meaning that the rds source is not set and thus the custom regex logic not applied:

for source in [
"codebuild",
"lambda",
"redshift",
"cloudfront",
"kinesis",
"mariadb",
"mysql",
"apigateway",
"route53",
"vpc",
"/aws/rds",
"sns",
"waf",
"docdb",
"fargate",

We have been confusingly disallowed from setting the host tag ourselves from the "logStream" attribute, which contains our instance name:

image

Steps to reproduce the issue:

With an RDS instance that has the word "mariadb" in its name, forward MariaDB Audit Logs to CloudWatch and hook up the Log Group to your Datadog Lambda.

image

You should see service and source set to MariaDB, and no host set nor instance name extracted.

Grok parser rule for lambda log pipeline

Please add following Grok parser rule to lambda log pipeline so that console.log(...) messages get parsed too.

#### JSON Rules

#2018-08-22T11:05:02.637Z	4d8b3785-079a-5f47-9b09-647242079aed {"level": "INFO", "message": "that's an interesting message", "user": "john"}
log_json_rule %{date("yyyy-MM-dd'T'HH:mm:ss.SSSZ"):timestamp}\s+%{notSpace:lambda.request_id}\s+%{regex("\\{.*\\}")::json}

#### Plain Text Rules

#2018-08-22T11:05:02.637Z	4d8b3785-079a-5f47-9b09-647242079aed INFO: that's an interesting message
log_level_msg_rule %{date("yyyy-MM-dd'T'HH:mm:ss.SSSZ"):timestamp}\s+%{notSpace:lambda.request_id}\s+%{regex("/(?i)ERROR|WARN|INFO|DEBUG|TRACE/"):level}:?\s+%{data:message}

#2018-08-22T11:05:02.637Z	4d8b3785-079a-5f47-9b09-647242079aed that's an interesting message
log_msg_rule %{date("yyyy-MM-dd'T'HH:mm:ss.SSSZ"):timestamp}\s+%{notSpace:lambda.request_id}\s+%{data:message}

Latest lambda log forwarder changes function name to lowercase

We just implemented the latest version of the lambda log forwarded and discovered that it is now changing the function name to lowercase in the metadata forwarded to Datadog. This was a breaking change for us because the previous version of the lambda function wasn't doing this and now the new logs are indexed differently from the old logs.

Is there a reason the function name needs to be lowercase? What is the benefit? We have lots of functions with camelcase names like "test-messaging-transformDataFromSomewhere". Now when a developer goes into Datadog they can no longer cut-and-paste the function name from AWS when doing searches because the name is different.

parse_event_source misidentifies some ELB logs

AWS ELB logs have a random string in the filename. If this random string contains the characters vpc, rds, or sns, the source is misidentified in Log.lambda_function.parse_event_source:

def parse_event_source(event, key):
    for source in [
        "lambda",
        "redshift",
        "cloudfront",
        "kinesis",
        "mariadb",
        "mysql",
        "apigateway",
        "route53",
        "vpc",
        "rds",
        "sns",
    ]:
        if source in key:
            return source
    if "elasticloadbalancing" in key:
        return "elb"
    if is_cloudtrail(str(key)):
        return "cloudtrail"
    if "awslogs" in event:
        return "cloudwatch"
    if "Records" in event and len(event["Records"]) > 0:
        if "s3" in event["Records"][0]:
            return "s3"
    return "aws"

A potential fix would be to move the more selective conditionals for elasticloadbalancing, cloudtrail, and awslogs so that they precede the less-selective matchers.

Support for CloudWatch -> Kinesis -> S3 Log storage flow

Describe what happened:
We store all of our logs on S3 via a Kinesis stream from CloudWatch. Because of this, we must use S3 bucket watching as our flow for importing logs into Datadog. However, the default serverless lambda_handler does not support this workflow very effectively. It doesn't know how to parse the file (double gziped), parse the separate lines, nor separate the aggregate logs into separate events.

Describe what you expected:
A clean log flow from events imported from S3 via Kinesis

Steps to reproduce the issue:
Set up a Kinesis Firehose subscription to a CloudWatch event stream. Set that Kinesis Firehose to store into an S3 bucket using GZIP.

I was able to get this working with some minor adjustments to the script. Find it attached here. Happy to make a PR if wanted. I added inline comments to where I changed contents (s3_handler, and the regex creation)

# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2018 Datadog, Inc.

from __future__ import print_function

import base64
import gzip
import json
import os
import re
import socket
from botocore.vendored import requests
import time
import ssl
import six.moves.urllib as urllib  # for for Python 2.7 urllib.unquote_plus
import itertools
from io import BytesIO, BufferedReader

import boto3

try:
    # Datadog Lambda layer is required to forward metrics
    from datadog_lambda.wrapper import datadog_lambda_wrapper
    from datadog_lambda.metric import lambda_stats
    DD_FORWARD_METRIC = True
except ImportError:
    # For backward-compatibility
    DD_FORWARD_METRIC = False


# Set this variable to `False` to disable log forwarding.
# E.g., when you only want to forward metrics from logs.
DD_FORWARD_LOG = os.getenv("DD_FORWARD_LOG", default="true").lower() == "true"


# Change this value to change the underlying network client (HTTP or TCP),
# by default, use the TCP client.
DD_USE_TCP = os.getenv("DD_USE_TCP", default="true").lower() == "true"


# Define the destination endpoint to send logs to
DD_SITE = os.getenv("DD_SITE", default="datadoghq.com")
if DD_USE_TCP:
    DD_URL = os.getenv("DD_URL", default="lambda-intake.logs." + DD_SITE)
    try:
        if "DD_SITE" in os.environ and DD_SITE == "datadoghq.eu":
            DD_PORT = int(os.environ.get("DD_PORT", 443))
        else:
            DD_PORT = int(os.environ.get("DD_PORT", 10516))
    except Exception:
        DD_PORT = 10516
else:
    DD_URL = os.getenv("DD_URL", default="lambda-http-intake.logs." + DD_SITE)


class ScrubbingRuleConfig(object):
    def __init__(self, name, pattern, placeholder):
        self.name = name
        self.pattern = pattern
        self.placeholder = placeholder


# Scrubbing sensitive data
# Option to redact all pattern that looks like an ip address / email address / custom pattern
SCRUBBING_RULE_CONFIGS = [
    ScrubbingRuleConfig(
        "REDACT_IP", "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "xxx.xxx.xxx.xxx"
    ),
    ScrubbingRuleConfig(
        "REDACT_EMAIL",
        "[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
        "[email protected]",
    ),
    ScrubbingRuleConfig(
        "DD_SCRUBBING_RULE", 
        os.getenv("DD_SCRUBBING_RULE", default=None), 
        os.getenv("DD_SCRUBBING_RULE_REPLACEMENT", default="xxxxx")
    )
]

# Use for include, exclude, and scrubbing rules
def compileRegex(rule, pattern):
    if pattern is not None:
        if pattern == "":
            # If pattern is an empty string, raise exception
            raise Exception("No pattern provided:\nAdd pattern or remove {} environment variable".format(rule))
        try:
            return re.compile(pattern)
        except Exception:
            raise Exception("could not compile {} regex with pattern: {}".format(rule, pattern))


# Filtering logs 
# Option to include or exclude logs based on a pattern match
INCLUDE_AT_MATCH = os.getenv("INCLUDE_AT_MATCH", default=None)
include_regex = compileRegex("INCLUDE_AT_MATCH", INCLUDE_AT_MATCH)

EXCLUDE_AT_MATCH = os.getenv("EXCLUDE_AT_MATCH", default=None)
exclude_regex = compileRegex("EXCLUDE_AT_MATCH", EXCLUDE_AT_MATCH)


# DD_API_KEY: Datadog API Key
DD_API_KEY = "<your_api_key>"
if "DD_KMS_API_KEY" in os.environ:
    ENCRYPTED = os.environ["DD_KMS_API_KEY"]
    DD_API_KEY = boto3.client("kms").decrypt(
        CiphertextBlob=base64.b64decode(ENCRYPTED)
    )["Plaintext"]
elif "DD_API_KEY" in os.environ:
    DD_API_KEY = os.environ["DD_API_KEY"]

# Strip any trailing and leading whitespace from the API key
DD_API_KEY = DD_API_KEY.strip()

# DD_API_KEY must be set
if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "":
    raise Exception(
        "You must configure your Datadog API key using "
        "DD_KMS_API_KEY or DD_API_KEY"
    )
# Check if the API key is the correct number of characters
if len(DD_API_KEY) != 32:
    raise Exception(
        "The API key is not the expected length. "
        "Please confirm that your API key is correct"
    )
# Validate the API key
validation_res = requests.get(
    "https://api.{}/api/v1/validate?api_key={}".format(DD_SITE, DD_API_KEY)
)
if not validation_res.ok:
    raise Exception("The API key is not valid.")


# DD_MULTILINE_LOG_REGEX_PATTERN: Datadog Multiline Log Regular Expression Pattern
DD_MULTILINE_LOG_REGEX_PATTERN = None
if "DD_MULTILINE_LOG_REGEX_PATTERN" in os.environ:
    DD_MULTILINE_LOG_REGEX_PATTERN = os.environ["DD_MULTILINE_LOG_REGEX_PATTERN"]
    # JO: Kinesis doesn't split logs on newlines, so the required newline component of this regex had to go
    try:
        multiline_regex = re.compile(
            "(?={})".format(DD_MULTILINE_LOG_REGEX_PATTERN)
        )
    except Exception:
        raise Exception("could not compile multiline regex with pattern: {}".format(DD_MULTILINE_LOG_REGEX_PATTERN))
    multiline_regex_start_pattern = re.compile(
        "^{}".format(DD_MULTILINE_LOG_REGEX_PATTERN)
    )

rds_regex = re.compile("/aws/rds/instance/(?P<host>[^/]+)/(?P<name>[^/]+)")

DD_SOURCE = "ddsource"
DD_CUSTOM_TAGS = "ddtags"
DD_SERVICE = "service"
DD_HOST = "host"
DD_FORWARDER_VERSION = "1.5.1"

# Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar!
DD_TAGS = os.environ.get("DD_TAGS", "")


class RetriableException(Exception):
    pass


class ScrubbingException(Exception):
    pass


class DatadogClient(object):
    """
    Client that implements a exponential retrying logic to send a batch of logs.
    """

    def __init__(self, client, max_backoff=30):
        self._client = client
        self._max_backoff = max_backoff

    def send(self, logs):
        backoff = 1
        while True:
            try:
                self._client.send(logs)
                return
            except RetriableException:
                time.sleep(backoff)
                if backoff < self._max_backoff:
                    backoff *= 2
                continue

    def __enter__(self):
        self._client.__enter__()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        self._client.__exit__(ex_type, ex_value, traceback)


class DatadogTCPClient(object):
    """
    Client that sends a batch of logs over TCP.
    """

    def __init__(self, host, port, api_key, scrubber):
        self.host = host
        self.port = port
        self._api_key = api_key
        self._scrubber = scrubber
        self._sock = None

    def _connect(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock = ssl.wrap_socket(sock)
        sock.connect((self.host, self.port))
        self._sock = sock

    def _close(self):
        if self._sock:
            self._sock.close()

    def _reset(self):
        self._close()
        self._connect()

    def send(self, logs):
        try:
            frame = self._scrubber.scrub(
                "".join(
                    ["{} {}\n".format(self._api_key, log) for log in logs]
                )
            )
            self._sock.sendall(frame.encode("UTF-8"))
        except ScrubbingException:
            raise Exception("could not scrub the payload")
        except Exception:
            # most likely a network error, reset the connection
            self._reset()
            raise RetriableException()

    def __enter__(self):
        self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        self._close()


class DatadogHTTPClient(object):
    """
    Client that sends a batch of logs over HTTP.
    """

    _POST = "POST"
    _HEADERS = {"Content-type": "application/json"}

    def __init__(self, host, api_key, scrubber, timeout=10):
        self._url = "https://{}/v1/input/{}".format(host, api_key)
        self._scrubber = scrubber
        self._timeout = timeout
        self._session = None

    def _connect(self):
        self._session = requests.Session()
        self._session.headers.update(self._HEADERS)

    def _close(self):
        self._session.close()

    def send(self, logs):
        """
        Sends a batch of log, only retry on server and network errors.
        """
        try:
            resp = self._session.post(
                self._url,
                data=self._scrubber.scrub("[{}]".format(",".join(logs))),
                timeout=self._timeout,
            )
        except ScrubbingException:
            raise Exception("could not scrub the payload")
        except Exception:
            # most likely a network error
            raise RetriableException()
        if resp.status_code >= 500:
            # server error
            raise RetriableException()
        elif resp.status_code >= 400:
            # client error
            raise Exception(
                "client error, status: {}, reason {}".format(
                    resp.status_code, resp.reason
                )
            )
        else:
            # success
            return

    def __enter__(self):
        self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        self._close()


class DatadogBatcher(object):
    def __init__(self, max_log_size_bytes, max_size_bytes, max_size_count):
        self._max_log_size_bytes = max_log_size_bytes
        self._max_size_bytes = max_size_bytes
        self._max_size_count = max_size_count

    def _sizeof_bytes(self, log):
        return len(log.encode("UTF-8"))

    def batch(self, logs):
        """
        Returns an array of batches.
        Each batch contains at most max_size_count logs and
        is not strictly greater than max_size_bytes.
        All logs strictly greater than max_log_size_bytes are dropped.
        """
        batches = []
        batch = []
        size_bytes = 0
        size_count = 0
        for log in logs:
            log_size_bytes = self._sizeof_bytes(log)
            if size_count > 0 and (
                size_count >= self._max_size_count
                or size_bytes + log_size_bytes > self._max_size_bytes
            ):
                batches.append(batch)
                batch = []
                size_bytes = 0
                size_count = 0
            # all logs exceeding max_log_size_bytes are dropped here
            if log_size_bytes <= self._max_log_size_bytes:
                batch.append(log)
                size_bytes += log_size_bytes
                size_count += 1
        if size_count > 0:
            batches.append(batch)
        return batches


class ScrubbingRule(object):
    def __init__(self, regex, placeholder):
        self.regex = regex
        self.placeholder = placeholder


class DatadogScrubber(object):
    def __init__(self, configs):
        rules = []
        for config in configs:
            if config.name in os.environ:
                rules.append(
                    ScrubbingRule(
                        compileRegex(config.name, config.pattern), 
                        config.placeholder
                    )
                )
        self._rules = rules

    def scrub(self, payload):
        for rule in self._rules:
            try:
                payload = rule.regex.sub(rule.placeholder, payload)
            except Exception:
                raise ScrubbingException()
        return payload


def datadog_forwarder(event, context):
    """The actual lambda function entry point"""
    events = parse(event, context)
    metrics, logs = split(events)
    if DD_FORWARD_LOG:
        forward_logs(filter_logs(logs))
    if DD_FORWARD_METRIC:
        forward_metrics(metrics)


if DD_FORWARD_METRIC:
    # Datadog Lambda layer is required to forward metrics
    lambda_handler = datadog_lambda_wrapper(datadog_forwarder)
else:
    lambda_handler = datadog_forwarder


def forward_logs(logs):
    """Forward logs to Datadog"""
    scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
    if DD_USE_TCP:
        batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1)
        cli = DatadogTCPClient(DD_URL, DD_PORT, DD_API_KEY, scrubber)
    else:
        batcher = DatadogBatcher(128 * 1000, 1 * 1000 * 1000, 25)
        cli = DatadogHTTPClient(DD_URL, DD_API_KEY, scrubber)

    with DatadogClient(cli) as client:
        for batch in batcher.batch(logs):
            try:
                client.send(batch)
            except Exception as e:
                print("Unexpected exception: {}, batch: {}".format(str(e), batch))


def parse(event, context):
    """Parse Lambda input to normalized events"""
    metadata = generate_metadata(context)
    try:
        # Route to the corresponding parser
        event_type = parse_event_type(event)
        if event_type == "s3":
            events = s3_handler(event, context, metadata)
        elif event_type == "awslogs":
            events = awslogs_handler(event, context, metadata)
        elif event_type == "events":
            events = cwevent_handler(event, metadata)
        elif event_type == "sns":
            events = sns_handler(event, metadata)
        elif event_type == "kinesis":
            events = kinesis_awslogs_handler(event, context, metadata)
    except Exception as e:
        # Logs through the socket the error
        err_message = "Error parsing the object. Exception: {} for event {}".format(
            str(e), event
        )
        events = [err_message]
    return normalize_events(events, metadata)


def generate_metadata(context):
    metadata = {
        "ddsourcecategory": "aws",
        "aws": {
            "function_version": context.function_version,
            "invoked_function_arn": context.invoked_function_arn,
        },
    }
    # Add custom tags here by adding new value with the following format "key1:value1, key2:value2"  - might be subject to modifications
    dd_custom_tags_data = {
        "forwardername": context.function_name.lower(),
        "memorysize": context.memory_limit_in_mb,
        "forwarder_version": DD_FORWARDER_VERSION,
    }
    metadata[DD_CUSTOM_TAGS] = ",".join(
        filter(
            None,
            [
                DD_TAGS,
                ",".join(
                    ["{}:{}".format(k, v) for k, v in dd_custom_tags_data.items()]
                ),
            ],
        )
    )

    return metadata


def extract_metric(event):
    """Extract metric from an event if possible"""
    try:
        metric = json.loads(event['message'])
        required_attrs = {'m', 'v', 'e', 't'}
        if all(attr in metric for attr in required_attrs):
            return metric
        else:
            return None
    except Exception:
        return None


def split(events):
    """Split events to metrics and logs"""
    metrics, logs = [], []
    for event in events:
        metric = extract_metric(event)
        if metric:
            metrics.append(metric)
        else:
            logs.append(json.dumps(event))
    return metrics, logs


# should only be called when INCLUDE_AT_MATCH and/or EXCLUDE_AT_MATCH exist
def filter_logs(logs):
    """
    Applies log filtering rules.
    If no filtering rules exist, return all the logs.
    """
    if INCLUDE_AT_MATCH is None and EXCLUDE_AT_MATCH is None:
        # convert to strings
        return logs 
    # Add logs that should be sent to logs_to_send
    logs_to_send = []
    # Test each log for exclusion and inclusion, if the criteria exist
    for log in logs:
        try:
            if EXCLUDE_AT_MATCH is not None:
                # if an exclude match is found, do not add log to logs_to_send
                if re.search(exclude_regex, log):
                    continue
            if INCLUDE_AT_MATCH is not None:
                # if no include match is found, do not add log to logs_to_send 
                if not re.search(include_regex, log):
                    continue
            logs_to_send.append(log)
        except ScrubbingException:
            raise Exception("could not filter the payload")
    return logs_to_send


def forward_metrics(metrics):
    """
    Forward custom metrics submitted via logs to Datadog in a background thread
    using `lambda_stats` that is provided by the Datadog Python Lambda Layer.
    """
    for metric in metrics:
        try:
            lambda_stats.distribution(
                metric['m'],
                metric['v'],
                timestamp=metric['e'],
                tags=metric['t']
            )
        except Exception as e:
            print("Unexpected exception: {}, metric: {}".format(str(e), metric))


# Utility functions


def normalize_events(events, metadata):
    normalized = []
    for event in events:
        if isinstance(event, dict):
            normalized.append(merge_dicts(event, metadata))
        elif isinstance(event, str):
            normalized.append(merge_dicts({"message": event}, metadata))
        else:
            # drop this log
            continue
    return normalized


def parse_event_type(event):
    if "Records" in event and len(event["Records"]) > 0:
        if "s3" in event["Records"][0]:
            return "s3"
        elif "Sns" in event["Records"][0]:
            return "sns"
        elif "kinesis" in event["Records"][0]:
            return "kinesis"

    elif "awslogs" in event:
        return "awslogs"

    elif "detail" in event:
        return "events"
    raise Exception("Event type not supported (see #Event supported section)")


# Handle S3 events
def s3_handler(event, context, metadata):
    s3 = boto3.client("s3")

    # Get the object from the event and show its content type
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = urllib.parse.unquote_plus(event["Records"][0]["s3"]["object"]["key"])

    source = parse_event_source(event, key)
    metadata[DD_SOURCE] = source
    ##default service to source value
    metadata[DD_SERVICE] = source
    ##Get the ARN of the service and set it as the hostname
    hostname = parse_service_arn(source, key, bucket, context)
    if hostname:
        metadata[DD_HOST] = hostname

    # Extract the S3 object
    response = s3.get_object(Bucket=bucket, Key=key)
    body = response["Body"]
    data = body.read()

    # If the name has a .gz extension, then decompress the data
    if key[-3:] == ".gz":
        data = gzip.decompress(data)
        # JO: kinesis data stored in s3 is doubly gziped (TODO: look into this)
        if source == "kinesis":
            data = gzip.decompress(data)

    if is_cloudtrail(str(key)):
        cloud_trail = json.loads(data)
        for event in cloud_trail["Records"]:
            # Create structured object and send it
            structured_line = merge_dicts(
                event, {"aws": {"s3": {"bucket": bucket, "key": key}}}
            )
            yield structured_line
    else:
        # Check if using multiline log regex pattern
        # and determine whether line or pattern separated logs
        data = data.decode("utf-8")
        if DD_MULTILINE_LOG_REGEX_PATTERN and multiline_regex_start_pattern.match(data):
            split_data = multiline_regex.split(data)
        else:
            split_data = data.splitlines()

        # Send lines to Datadog
        for line in split_data:
            if not line:
                continue
            
            # JO: edits here to support better metadata on kinesis events
            if source == "kinesis":
                json_line = json.loads(line)
                for log_event in json_line["logEvents"]:
                    log_event["owner"] = json_line["owner"]
                    log_event["logGroup"] = json_line["logGroup"]
                    log_event["logStream"] = json_line["logStream"]
                    metadata[DD_SOURCE] = json_line["logStream"].split('-')[0]
                    metadata[DD_SERVICE] = json_line["logStream"].split('-')[0]
                
                    # Create structured object and send it
                    structured_line = {
                        "aws": {"s3": {"bucket": bucket, "key": key}},
                        "message": log_event,
                    }
                    yield structured_line
                    
            else:
                # Create structured object and send it
                structured_line = {
                    "aws": {"s3": {"bucket": bucket, "key": key}},
                    "message": log_event,
                }
                yield structured_line


# Handle CloudWatch logs from Kinesis
def kinesis_awslogs_handler(event, context, metadata):
    def reformat_record(record):
        return {
            "awslogs": {
                "data": record["kinesis"]["data"]
            }
        }
        
    return itertools.chain.from_iterable(awslogs_handler(reformat_record(r), context, metadata) for r in event["Records"])


# Handle CloudWatch logs
def awslogs_handler(event, context, metadata):
    # Get logs
    with gzip.GzipFile(
        fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"]))
    ) as decompress_stream:
        # Reading line by line avoid a bug where gzip would take a very long
        # time (>5min) for file around 60MB gzipped
        data = b"".join(BufferedReader(decompress_stream))
    logs = json.loads(data)

    # Set the source on the logs
    source = logs.get("logGroup", "cloudwatch")
    metadata[DD_SOURCE] = parse_event_source(event, source)

    # Default service to source value
    metadata[DD_SERVICE] = metadata[DD_SOURCE]

    # Build aws attributes
    aws_attributes = {
        "aws": {
            "awslogs": {
                "logGroup": logs["logGroup"],
                "logStream": logs["logStream"],
                "owner": logs["owner"],
            }
        }
    }

    # Set host as log group where cloudwatch is source
    if metadata[DD_SOURCE] == "cloudwatch":
        metadata[DD_HOST] = aws_attributes["aws"]["awslogs"]["logGroup"]

    # When parsing rds logs, use the cloudwatch log group name to derive the
    # rds instance name, and add the log name of the stream ingested
    if metadata[DD_SOURCE] == "rds":
        match = rds_regex.match(logs["logGroup"])
        if match is not None:
            metadata[DD_HOST] = match.group("host")
            metadata[DD_CUSTOM_TAGS] = (
                    metadata[DD_CUSTOM_TAGS] + ",logname:" + match.group("name")
            )
            # We can intuit the sourcecategory in some cases
            if match.group("name") == "postgresql":
                metadata[DD_CUSTOM_TAGS] + ",sourcecategory:" + match.group("name")

    # For Lambda logs we want to extract the function name,
    # then rebuild the arn of the monitored lambda using that name.
    # Start by splitting the log group to get the function name
    if metadata[DD_SOURCE] == "lambda":
        log_group_parts = logs["logGroup"].split("/lambda/")
        if len(log_group_parts) > 1:
            function_name = log_group_parts[1].lower()
            # Split the arn of the forwarder to extract the prefix
            arn_parts = context.invoked_function_arn.split("function:")
            if len(arn_parts) > 0:
                arn_prefix = arn_parts[0]
                # Rebuild the arn by replacing the function name
                arn = arn_prefix + "function:" + function_name
                # Add the arn as a log attribute
                arn_attributes = {"lambda": {"arn": arn}}
                aws_attributes = merge_dicts(aws_attributes, arn_attributes)
                # Add the function name as tag
                metadata[DD_CUSTOM_TAGS] += ",functionname:" + function_name
                # Set the arn as the hostname
                metadata[DD_HOST] = arn
                # Default `env` to `none` and `service` to the function name,
                # for correlation with the APM env and service.
                metadata[DD_SERVICE] = function_name
                metadata[DD_CUSTOM_TAGS] += ",env:none"

    # Create and send structured logs to Datadog
    for log in logs["logEvents"]:
        yield merge_dicts(log, aws_attributes)


# Handle Cloudwatch Events
def cwevent_handler(event, metadata):

    data = event

    # Set the source on the log
    source = data.get("source", "cloudwatch")
    service = source.split(".")
    if len(service) > 1:
        metadata[DD_SOURCE] = service[1]
    else:
        metadata[DD_SOURCE] = "cloudwatch"
    ##default service to source value
    metadata[DD_SERVICE] = metadata[DD_SOURCE]

    yield data


# Handle Sns events
def sns_handler(event, metadata):

    data = event
    # Set the source on the log
    metadata[DD_SOURCE] = parse_event_source(event, "sns")

    for ev in data["Records"]:
        # Create structured object and send it
        structured_line = ev
        yield structured_line


def merge_dicts(a, b, path=None):
    if path is None:
        path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass  # same leaf value
            else:
                raise Exception(
                    "Conflict while merging metadatas and the log entry at %s"
                    % ".".join(path + [str(key)])
                )
        else:
            a[key] = b[key]
    return a


cloudtrail_regex = re.compile(
    "\d+_CloudTrail_\w{2}-\w{4,9}-\d_\d{8}T\d{4}Z.+.json.gz$", re.I
)


def is_cloudtrail(key):
    match = cloudtrail_regex.search(key)
    return bool(match)


def parse_event_source(event, key):
    if "elasticloadbalancing" in key:
        return "elb"
    for source in [
        "lambda",
        "redshift",
        "cloudfront",
        "kinesis",
        "mariadb",
        "mysql",
        "apigateway",
        "route53",
        "vpc",
        "rds",
        "sns",
        "waf",
        "docdb",
        "fargate"
    ]:
        if source in key:
            return source
    if "API-Gateway" in key or "ApiGateway" in key:
        return "apigateway"
    if is_cloudtrail(str(key)) or ('logGroup' in event and event['logGroup'] == 'CloudTrail'):
        return "cloudtrail"
    if "awslogs" in event:
        return "cloudwatch"
    if "Records" in event and len(event["Records"]) > 0:
        if "s3" in event["Records"][0]:
            return "s3"

    return "aws"


def parse_service_arn(source, key, bucket, context):
    if source == "elb":
        # For ELB logs we parse the filename to extract parameters in order to rebuild the ARN
        # 1. We extract the region from the filename
        # 2. We extract the loadbalancer name and replace the "." by "/" to match the ARN format
        # 3. We extract the id of the loadbalancer
        # 4. We build the arn
        idsplit = key.split("/")
        # If there is a prefix on the S3 bucket, idsplit[1] will be "AWSLogs"
        # Remove the prefix before splitting they key
        if len(idsplit) > 1 and idsplit[1] == "AWSLogs":
            idsplit = idsplit[1:]
            keysplit = "/".join(idsplit).split("_")
        # If no prefix, split the key
        else:
            keysplit = key.split("_")        
        if len(keysplit) > 3:
            region = keysplit[2].lower()
            name = keysplit[3]
            elbname = name.replace(".", "/")
            if len(idsplit) > 1:
                idvalue = idsplit[1]
                return "arn:aws:elasticloadbalancing:{}:{}:loadbalancer/{}".format(
                    region, idvalue, elbname
                )
    if source == "s3":
        # For S3 access logs we use the bucket name to rebuild the arn
        if bucket:
            return "arn:aws:s3:::{}".format(bucket)
    if source == "cloudfront":
        # For Cloudfront logs we need to get the account and distribution id from the lambda arn and the filename
        # 1. We extract the cloudfront id  from the filename
        # 2. We extract the AWS account id from the lambda arn
        # 3. We build the arn
        namesplit = key.split("/")
        if len(namesplit) > 0:
            filename = namesplit[len(namesplit) - 1]
            # (distribution-ID.YYYY-MM-DD-HH.unique-ID.gz)
            filenamesplit = filename.split(".")
            if len(filenamesplit) > 3:
                distributionID = filenamesplit[len(filenamesplit) - 4].lower()
                arn = context.invoked_function_arn
                arnsplit = arn.split(":")
                if len(arnsplit) == 7:
                    awsaccountID = arnsplit[4].lower()
                    return "arn:aws:cloudfront::{}:distribution/{}".format(
                        awsaccountID, distributionID
                    )
    if source == "redshift":
        # For redshift logs we leverage the filename to extract the relevant information
        # 1. We extract the region from the filename
        # 2. We extract the account-id from the filename
        # 3. We extract the name of the cluster
        # 4. We build the arn: arn:aws:redshift:region:account-id:cluster:cluster-name
        namesplit = key.split("/")
        if len(namesplit) == 8:
            region = namesplit[3].lower()
            accountID = namesplit[1].lower()
            filename = namesplit[7]
            filesplit = filename.split("_")
            if len(filesplit) == 6:
                clustername = filesplit[3]
                return "arn:aws:redshift:{}:{}:cluster:{}:".format(
                    region, accountID, clustername
                )
    return

Lambda timeout logs should be treated as errors

If a lambda function runs in a timeout it logs a message like this
2018-08-26T02:13:13.948Z 3364d63a-b18c-5ed6-97c7-85c6bf57800e Task timed out after 30.02 seconds

Unfortunately there is no Error word in this log so it's not possible to set status field to Error by just parsing with grok.

I came up with the following workaround, you may want to include something like this in the default lambda log integration pipeline

Step One
add a Grok Parser to create a new field called error_message

lambda_timeout_rule  %{date("yyyy-MM-dd'T'HH:mm:ss.SSSZ"):timestamp}\s+%{notSpace:lambda.request_id}\s+%{regex("Task timed out .*"):error_message}

Step Two
add a Category Processor to set the status field to Error if an error_message field is present

Review added entries
Name: ERROR
MATCHING QUERY: @error_message:*

Clarification request on memory/timeout for 'aws/logs_monitoring/lambda_function.py'

I find three separate values for the recommended memory/timeout function for the AWS logs collection Lambda function.

  • Recommended function timeout ranges from 10 seconds to 120 seconds
  • Recommended memory ranges from 128 to "the highest possible value".

Can you clarify which of these is preferred? Obviously, "more memory and longer timeout" is safer but I'm curious if you can provide any guidance on memory usage as it relates to log volume. I'd prefer not to allocate the Lambda function a lot of memory that will never be used.

README in Github project

https://github.com/DataDog/datadog-serverless-functions/tree/00e49919a1e1c453db5bb157f813d4b549be5f1e/aws/logs_monitoring

Set the memory to the highest possible value.
Set also the timeout limit. We recommends 120 seconds to deal with big files.

Default values in log-sam-template.yaml

https://github.com/DataDog/datadog-serverless-functions/blob/00e49919a1e1c453db5bb157f813d4b549be5f1e/aws/logs_monitoring/log-sam-template.yaml

These values are considerably lower than what is recommended in the README.md

Resources:
  loglambdaddfunction:
    Type: 'AWS::Serverless::Function'
    Properties:
      Description: Pushes logs from S3 and CloudWatch Logs to Datadog.
      Handler: lambda_function.lambda_handler
      MemorySize: 128
      Runtime: python2.7
      Timeout: 10

Datadog documentation

https://docs.datadoghq.com/integrations/amazon_web_services/?tab=allpermissions#provide-the-code-and-configure-the-lambda

screen shot 2019-01-12 at 12 00 31 pm

Thanks!

Cloudfront logs parsing issues

Hi !

I've tried to setup the logs fowarder function to parse Cloudfront logs. I used an S3 trigger according to the documentation (manual triggering).

After the setup, the logs were perfectly forwarded by with an s3 source. I figured that you look the keyword cloudfront in the s3 filename.

https://github.com/DataDog/dd-aws-lambda-functions/blob/master/Log/lambda_function.py#L295-L296

My workaround was to add the keyword cloudfront into the prefix on cloudfront logging and it worked fine. Could you update to documentation to make it more clear ?

Forwarder Lambda Function does not have AWSLambdaBasicExecutionRole policy

Describe what happened:
Log Groups are empty for DataDog forwarder lambda in 3.1.0. A log group is created, but it looks like the Polices Property assigned to the Forwarder function does not have at least the AWSLambdaBasicExecutionRole to write to CloudWatch

Describe what you expected:
Logs to appear in the Forwarder function log group.

Steps to reproduce the issue:
Hopefully I didn't make an error here - but just look in the log group for the Forwarder function. Nothing appears.

AttributeError: 'module' object has no attribute 'compress'

Describe what happened:

A fresh setup of serverlessrepo-Datadog-Log-Forwarder now results in an error message when using it with a CloudWatch trigger.

[ERROR]	2020-02-05T19:04:31.104Z Exception while forwarding logs in batch 
...
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 544, in forward_logs
    client.send(batch)
  File "/var/task/lambda_function.py", line 295, in send
    self._client.send(logs)
  File "/var/task/lambda_function.py", line 395, in send
    data = compress_logs(data, DD_COMPRESSION_LEVEL)
  File "/var/task/lambda_function.py", line 475, in compress_logs
    return gzip.compress(bytes(batch, 'utf-8'), compression_level)
AttributeError: 'module' object has no attribute 'compress'

Searching for a solution for this came up with no results. Upon reading #190 and sifting through the Lambda code it turns out that setting DD_USE_COMPRESSION=false is now required for an out of the box installation to work.

Describe what you expected:

A fresh setup of the log forwarder Lambda should work with a simple configuration.

Steps to reproduce the issue:

Spin up a new Lambda and setup a CloudWatch log group trigger.

Update version requires deploying new stack

Describe what you expected:
Some way to be possible to update the lambda code for the forwarder without destroying and creating a new one, because I need to attach to a specific ARN the log subscriptions and I'd need to redeploy a lot of lambdas to do this.

Maybe work with the code on a S3 and/or Terraform instead of just destroying and creating again. I think this will affect a lot of people using datadog, so I'd like to check if I'm not missing anything or if some thing is in the current plans to improve this update process.

[Flow Logs] error: invalid literal for int() with base 10: '-': ValueError

context

error

invalid literal for int() with base 10: '-': ValueError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 337, in lambda_handler
process_message(message, tags, timestamp, node_ip)
File "/var/task/lambda_function.py", line 53, in process_message
process_packets(packets, detailed_tags, timestamp)
File "/var/task/lambda_function.py", line 231, in process_packets
stats.histogram("packets.per_request", int(packets), tags=tags, timestamp=timestamp)
ValueError: invalid literal for int() with base 10: '-'

metrics

screen shot 2018-06-04 at 09 03 53

Python 3 support

Python 3 is almost 10 years old now, and Python2.7 will be unsupported in a few months time. All the library dependencies on this lambda support Python3 so there should be nothing blocking this upgrade.

Function does not handle s3 events delivered via SNS.

Describe what happened:
If you need s3 events to flow to multiple lambda functions you need to send the s3 events to an SNS topic and then subscribe each lambda to the SNS topic.

This function just handles the SNS payload as a structured data, not as an s3 event.
Describe what you expected:
This function should detect the s3 event inside the SNS payload and handle the object.

Steps to reproduce the issue:
Send s3 event to SNS topic. Subscribe lambda to SNS topic.

aws/logs_monitoring Adding parameters to SAM template to allow easier automated deployment of the log forwarder

I've managed to setup a near complete "infrastructure as code" deployment of the SAM template which essentially replicates the manual process of using the one click deployment inside of the AWS dashboard but instead executes automatically as part of our terraform initialisation.

The issue I've encountered is that while I can automatically deploy the Lambda & new role to my AWS environments I still have to manually set our DD_API_KEY and update the execution role linked to the Lambda.

Ideally instead of this two parameters could be provided to the SAM template

  • DD_API_KEY which would (if set) be used as an environment variable inside the Lambda
  • EXECUTION_ROLE_ARN (if set) would override the default execution role which is attached to the Lambda

Both would completely remove the need for any manual setting if, like our team, infrastructure changes are all managed through IAC.

If this sounds like a good addition I'm happy to do all the work and submit a PR, please let me know!

[Request] Consider uploading forwarder zip to (public) S3 bucket

Please consider uploading the production release of the aws-dd-forwarder zip-file to a public S3 bucket (read-only of course).

Users can then directly source the zip in AWS instead of having to either clone and package the repo or pull down the zip from Releases.

Preferably without the "noise" (tests directory, shell scripts, etc) ๐Ÿ˜Š

Error flushing metrics and events

This issue is regarding the log_monitoring.
Describe what happened:

image

Describe what you expected:

a clean log with no errors.

Steps to reproduce the issue:
used python 3.7 runtime on lambda running lambda_function.py and enhanced_lambda_metrics.py

Fetch API Key from AWS SSM

Describe what happened:
I have stored my Datadog API key as a SecureString in AWS SSM and would like to have the Datadog Forwarder Lambda function fetch from there.

Describe what you expected:
An environment variable (similar to the secrets manager var) that tells the function to fetch the key, and where from.

Log lamda function creates unnecessary costs on CloudWatch

The template provided by datadog to export logs from AWS creates a huge amount of logs on CloudWatch.
Every event which is sent to the log api is printed on the console, and hence ends up in CloudWatch.

See lambda_function.py:215.

This extra logging is not relevant by default and generates unnecessary costs. This should be removed or enabled only for debug purpose.

Forward tags from upstream sources

I see there's the DD_FETCH_LAMBDA_TAGS env var that has the capability to add the service tags to the aws.lambda.enhanced.* metrics but I don't see the ability to do this for log tagging yet. We'd love to be able to pull the tags from our AWS services sending logs to Datadog and have those tags show up on the logs as searchable fields. For instance we use the Service:<service_name> tag on many of our lambdas. It'd be great if that tag was applied to the logs from those lambdas, along with any other tags on the lambdas.

[RDS] Add custom RDS tags on RDS metrics

We currently send the RDS enhanced metrics with a few tags (dbinstanceidentifier being the identifier). This is a problem because people could have RDS instances with the same identifier over different regions. In that case, the resulting metrics would be an aggregation of the metrics of both instances.

A quick fix for that would be to add instanceResourceId as a tag, as this may be unique.

However, a better solution would be to have the endpoint accessible in the payload, as the endpoint is currently used as the unique identifier of a RDS instance within datadog. If we had access to it, we could link these metrics with the RDS hosts within datadog, and add all the custom tags.

Another way would be to have the instanceResourceId available via regular RDS api, to use this as the unique identifier instead of the endpoint.

[RDS] MSSQL Metrics causing lambda to fail due to missing keys/metrics

Describe what happened:
If the RDS instance is MSSQL, the RDS lambda is unable to parse the event due to missing keys.
Examples of these missing keys are loadAverageMinute, and processList.id

Describe what you expected:
The lambda to parse events regardless of RDS engine.

Steps to reproduce the issue:
Process event from RDS Engine: SqlServer
{ "engine": "SqlServer", "instanceID": "instanceID", "instanceResourceID": "db-resourceID", "timestamp": "2019-01-15T14:48:12Z", "version": 1, "uptime": "118 days, 04:25:12", "numVCPUs": 4, "cpuUtilization": { "idle": 99.22, "kern": 0.29, "user": 0.49 }, "memory": { "commitTotKb": 5907972, "commitLimitKb": 17301104, "commitPeakKb": 6611268, "physTotKb": 16776816, "physAvailKb": 11120468, "sysCacheKb": 8085436, "kernTotKb": 309488, "kernPagedKb": 212980, "kernNonpagedKb": 96508, "sqlServerTotKb": 4117792, "pageSize": 4096 }, "system": { "handles": 18148, "threads": 754, "processes": 41 }, "disks": [ { "name": "rdsdbdata", "totalKb": 31324096, "usedKb": 1238528, "usedPc": 3.95, "availKb": 30085568, "availPc": 96.05, "rdCountPS": 0, "rdBytesPS": 0, "wrCountPS": 0.6, "wrBytesPS": 2286.93 } ], "network": [ { "interface": "Ethernet 4", "rdBytesPS": 35552.47, "wrBytesPS": 26602.47 } ], "processList": [ { "name": "OS processes", "cpuUsedPc": 0, "memUsedPc": 0.66, "workingSetKb": 280000, "workingSetPrivKb": 110228, "workingSetShareableKb": 169772, "virtKb": 53690518976 }, { "name": "RDS processes", "cpuUsedPc": 0.18, "memUsedPc": 1.87, "workingSetKb": 437872, "workingSetPrivKb": 314516, "workingSetShareableKb": 123356, "virtKb": 2183729268 } ]

Please update the Readme

Could you please update the Readme to reflect the current Lambda interface/features?

For instance, I can't find the #Parameters anywhere as this is the whole code of the lambda_function.py:

def lambda_handler(event, context):
    # TODO implement
    return 'Hello from Lambda'

Please use github releases

It would be great if you guys could post lambda function updates with a new git tag or release, so that we can detect it and automagically pull down the latest code into our environment. Thanks!

Python 3 not compatible with DD_KMS_API_KEY

Describe what happened:

Lambda function reports "invalid API key"

Describe what you expected:

No error.

Steps to reproduce the issue:

Use Python 3.7 with DD_KMS_API_KEY

The string interpolation does not work with the binary value that comes from decrypted DD_KMS_API_KEY.

DataDog Forwarder template doesn't allow adding additional log groups

The documentation for the log forwarder describes how to manually add additional (non-Lambda) CloudWatch Log groups as triggers to its Lambda function (https://docs.datadoghq.com/integrations/amazon_web_services/?tab=allpermissions#collecting-logs-from-cloudwatch-log-group). However that feature doesn't seem to be supported by the SAM template yet. It'd be great if there would be support for that to avoid having manually edit the configuration of the AWS Lambda function of the DataDog Forwarder.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.