Code Monkey home page Code Monkey logo

amazon-elasticsearch-lambda-samples's Introduction

Streaming Data to Amazon Elasticsearch Service

Using AWS Lambda: Sample Node.js Code

Introduction

It is often useful to stream data, as it gets generated, for indexing in an Amazon Elasticsearch Service domain. This helps fresh data to be available for search or analytics. To do this requires:

  1. Knowing when new data is available
  2. Code to pick up and parse the data into JSON documents, and add them to an Amazon Elasticsearch (henceforth, ES for short) domain.
  3. Scalable and fully managed infrastructure to host this code

Lambda is an AWS service that takes care of these requirements. Put simply, it is an "event handling" service in the cloud. Lambda lets us implement the event handler (in Node.js or Java), which it hosts and invokes in response to an event.

The handler can be triggered by a "push" or a "pull" approach. Certain event sources (such as S3) push an event notification to Lambda. Others (such as Kinesis) require Lambda to poll for events and pull them when available.

For more details on AWS Lambda, please see the documentation.

This package contains sample Lambda code (in Node.js) to stream data to ES from two common AWS data sources: S3 and Kinesis. The S3 sample takes apache log files, parses them into JSON documents and adds them to ES. The Kinesis sample reads JSON data from the stream and adds them to ES.

Note that the sample code has been kept simple for reasons for clarity. It does not handle ES document batching, or eventual consistency issues for S3 updates, etc.

Setup Overview

While some detailed instructions are covered later in this file and elsewhere (in the Lambda documentation), this section aims to show the larger picture that the individual steps work to accomplish. We assume that the data source (an S3 bucket or a Kinesis stream, in this case) and an ES domain are already set up.

  1. Deployment Package: The "Deployment Package" is the event handler code files and its dependencies packaged as a zip file. The first step in creating a new Lambda function is to prepare and upload this zip file.

  2. Lambda Configuration:

    1. Handler: The name of the main code file in the deployment package, with the file extension replaced with a .handler suffix.
    2. Memory: The memory limit, based on which the EC2 instance type to use is determined. For now, the default should do.
    3. Timeout: The default timeout value (3 seconds) is quite low for our use-case. 10 seconds might work better, but please adjust based on your testing.
  3. Authorization: Since there is a need here for various AWS services making calls to each other, appropriate authorization is required. This takes the form of configuring an IAM role, to which various authorization policies are attached. This role will be assumed by the Lambda function when running.

Note:

  • The AWS Console is simpler to use for configuration than other methods.
  • Lambda is currently available only in a few regions (us-east-1, us-west-2, eu-west-1, ap-northeast-1).
  • Once the setup is complete and tested, enable the data source in the Lambda console, so that data may start streaming in.
  • The code is kept simple for purposes of illustration. It doesn't batch documents when loading the ES domain, or (for S3 updates) handle eventual consistency cases.

Deployment Package Creation

  1. On your development machine, download and install Node.js.

  2. Anywhere, create a directory structure similar to the following:

    eslambda (place sample code here)
    |
    +-- node_modules (dependencies will go here)
    
  3. Modify the sample code with the correct ES endpoint, region, index and document type.

  4. Install each dependency imported by the sample code (with the require() call), as follows:

    npm install <dependency>
    

    Verify that these are installed within the node_modules subdirectory.

  5. Create a zip file to package the code and the node_modules subdirectory

    zip -r eslambda.zip *
    

The zip file thus created is the Lambda Deployment Package.

S3-Lambda-ES

Set up the Lambda function and the S3 bucket as described in the Lambda-S3 Walkthrough. Please keep in mind the following notes and configuration overrides:

  • The walkthrough uses the AWS CLI for configuration, but it's probably more convenient to use the AWS Console (web UI)

  • The S3 bucket must be created in the same region as Lambda is, so that it can push events to Lambda.

  • When registering the S3 bucket as the data-source in Lambda, add a filter for files having .log suffix, so that Lambda picks up only apache log files.

  • The following authorizations are required:

    1. Lambda permits S3 to push event notification to it
    2. S3 permits Lambda to fetch the created objects from a given bucket
    3. ES permits Lambda to add documents to the given domain

    The Lambda console provides a simple way to create an IAM role with policies for (1). For (2), when creating the IAM role, choose the "S3 execution role" option; this will load the role with permissions to read from the S3 bucket. For (3), add the following access policy to permit ES operations to the role.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "es:*"
                ],
                "Effect": "Allow",
                "Resource": "*"
            }
        ]
    }
    

Kinesis-Lambda-ES

Set up the Lambda function and the Kinesis stream as described in the Lambda-Kinesis Walkthrough. Please keep in mind the following notes and configuration overrides:

  • The walkthrough uses the AWS CLI, but it's probably more convenient to use the AWS Console (web UI) for Lambda configuration.

  • To the IAM role assigned to the Lambda function, add the following access policy to permit ES operations.

      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Action": [
                      "es:*"
                  ],
                  "Effect": "Allow",
                  "Resource": "*"
              }
          ]
      }
    
  • For testing: If you have a Kinesis client, use it to stream a record to Lambda. If not, the AWS CLI could be used to push a JSON document to Lambda.

    aws kinesis put-record --stream-name <lambda name> --data "<JSON document>" --region <region> --partition-key shardId-000000000000
    

Copyright

Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0

amazon-elasticsearch-lambda-samples's People

Contributors

hyandell avatar markatwood avatar picwelltimjones avatar srisub-amzn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-elasticsearch-lambda-samples's Issues

allow passing values from variables defined in lambda function

Hi,

I am trying to make a deployment of lambda as dynamic and smooth as possible but I've noticed that there is no way to pass custom variables into the zip file.
For example I would like to be able to assign cluster name via variable being passed on the console, (or using Terraform or Cloudformation or cli).

I've tested this but seems that the value is not taken.

To push .log.gz (Application ELB) Logs to ElasticSearch

Could you please give code to Push Application ELB logs ( Extention will be .log.gz) to ElasticSearch using lambda, and I try to find out in any form but no luck.
Could you please help me out to what change is necessary for this code to work with .log.gz file.
Thanks in Advance.
Regards,
JB

Error in ElasticSearch is not mapped to errorCallback of NodeHttpClient.handleRequest

I'm using the postToES method almost unchanged.
When i'm posting to ElasticSearch and the index mapping in ES is wrong for any reason i do not get an error in errorCallback of handleRequest. Instead i get an error response in the "normal" callback. Thats not what i expected and i do not want to parse the responseBody for checking errors. Is this a bug or am I missing something?

Here is the essential code part and the corresponding log output:

  var send = new AWS.NodeHttpClient();
  send.handleRequest(req, null, function(httpResp) 
  {
      var respBody = '';
      httpResp.on('data', function (chunk) 
      {
          respBody += chunk;
      });
      httpResp.on('end', function (chunk)
      {
        console.log(respBody);
        callback(null, respBody);
      });
  }, function(err) 
  {
    console.log('Error: ' + err);
    callback(err);
  });

console output at httpResp.on:
{"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse [xyz]"}],"type":"mapper_parsing_exception","reason":"failed to parse [xyz]","caused_by":{"type":"number_format_exception","reason":"For input string: "any_string""}},"status":400}

Suggested IAM Policy for Lambda functions is too broad.

The docs suggest a policy for ElasticSearch for the Lambda function that is too broad: i.e. it allows all actions on ElasticSearch, including deleting the domain:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

I suggest scoping this down to just the POST action on a specific index in the domain:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "es:ESHttpPost"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:es:region:aws-account-id:domain/domain-name/test-index"
        }
    ]
}

Getting "Error: write after end" on every other log file that is ingested from s3

Thank you for writing this. It's exactly what I needed. However, I'm running into an error with every other log file that is ingested from s3. This is the error as written to the CloudWatch logs:

2015-11-24T19:04:06.486Z fe5810ee-92dd-11e5-8a9f-8909a00ad51d Error: write after end
at writeAfterEnd (_stream_writable.js:133:12)
at LineStream.Writable.write (_stream_writable.js:181:5)
at write (_stream_readable.js:602:24)
at flow (_stream_readable.js:611:7)
at PassThrough.pipeOnReadable (stream_readable.js:643:5)
at PassThrough.emit (events.js:92:17)
at emitReadable
(_stream_readable.js:427:10)
at emitReadable (_stream_readable.js:423:5)
at readableAddChunk (_stream_readable.js:166:9)
at PassThrough.Readable.push (_stream_readable.js:128:10)

Does the NodeHttpClient need to close it's session or something?

Access Denied Error

I followed a couple of tutorials and setup a Kinesis Agent > Kinesis > Lambda > ES (EKK instead of ELK stack) and I ran into the following response using the code.

Response:
Unable to determine service/operation name to be authorized

After hours of searching, I found this thread ( http://www.philliphaydon.com/2016/01/03/using-aws-api-gateway-and-sqs-to-recieve-sendgrid-event-hooks/ ) that said to send the payload as a form. Sure enough, the error went away and I could talk to ES.

CloudTrial to ElasticSearch

Hi Team,

We having nearly 20 AWS accounts and we centralised CloudTrail logs to a single management account. We have a custom build ElasticSearch (5.6) in the EC2 in the same account.

Now we are trying to push all CloudTrail data from S3 bucket to ES (with put object as event trigger for lambda) using a lambda function. When using this we are facing some challenges while pushing the data to ES.

While going through lambda logs it is identified that some event response data (in json) is not maintaining proper structures. We are using python codes in lambda functions

Could you please help us to find a way to put all the CloudTrail events to ElasticSearch with out any failure.

kinesis -> lambda -> ES only posting some records

I setup a simple config to test this out:

logstash-forwarder -> logstash -> kinesis -> lambda -> ES

For batches of size 1, this works fine, log lines are pushed to ES in a few seconds.

For batch size > 1, I am seeing inconsistent behavior, with one of some of my records being pushed. It's often about 5 records that make it through but it's not consistent.

In my lambda logs I sometimes see Error: Error: socket hang up a number of times, corresponding to the number of missing records, but this too is inconsistent.

I've confirmed that the expected # of the records appear in the kinesis stream and are transmitted to the lambda job.

What am I doing wrong? Is my kinesis stream too small? My lambda timeout too short?

Java Example

Do you have any hints on how to setup the same example with the java sdk? I'm trying to created the signed requests so I can write to Elastic. If you can even point to the classes in the java sdk, that would be a help. I would actually help write back an example in a PR.

Elasticsearch 6.x compatibility issue

It does not work with the latest AWS Elasticsearch service. It silently fails without returning any error messages. Has anyone else tried with 6.x?

js folder exposed to anonymous users

I am new to web development and used this tutorial to build my first static website with AWS. I successfully completed all modules and have a working webpage with user auth using AWS Cognito.

I added another webpage that uses AWS Elasticsearch to search IMDB movies. I put the search.js, which includes my api gateway endpoint in the js folder.

Now a user can use my s3endpointwebsite.com/js/search.js and see my api gateway endpoint and my cognito ids.

How do I prevent someone from missing my api gateway endpoint?

Sign error

i am getting the below error any idea?

{
"errorMessage": "Not a string or buffer",
"errorType": "TypeError",
"stackTrace": [
"Hash.update (crypto.js:240:17)",
"Object.util.crypto.hash (/var/task/node_modules/aws-sdk/lib/util.js:509:24)",
"Object.sha256 (/var/task/node_modules/aws-sdk/lib/util.js:461:26)",
"V4.hash as hexEncodedHash",
"V4.hexEncodedBodyHash (/var/task/node_modules/aws-sdk/lib/signers/v4.js:195:19)",
"V4.canonicalString (/var/task/node_modules/aws-sdk/lib/signers/v4.js:140:21)",
"V4.stringToSign (/var/task/node_modules/aws-sdk/lib/signers/v4.js:127:41)",
"V4.signature (/var/task/node_modules/aws-sdk/lib/signers/v4.js:109:56)",
"V4.authorization (/var/task/node_modules/aws-sdk/lib/signers/v4.js:86:36)",
"V4.addAuthorization (/var/task/node_modules/aws-sdk/lib/signers/v4.js:36:12)"
]
}

context.succeed would be called multiple times

I'm a little confused how the kinesis_lambda_es.js example is supposed to work. Documentation says that context.succeed, when called, will terminate the lambda.

However, the example would call context.succeed for every HTTP response callback of AWS ElasticSearch. Which is correct? These two sources seem to contradict each other. If the docs.aws.amazon.com site is correct, then wouldn't the context.succeed callback need to occur after ALL ES calls had their callbacks called?

Unable to import module es

My name of the handler on console is es.handler.
With S3-Lambda-ES I am getting the following error

START RequestId: d6b5b2d1-672d-11e6-a9ef-1dccc79021cb Version: $LATEST
Unable to import module 'ES': Error at Function.Module._resolveFilename (module.js:325:15) at Function.Module._load (module.js:276:25) at Module.require (module.js:353:17) at require (internal/module.js:12:17) at Object. (/var/task/ES.js:19:18) at Module._compile (module.js:409:26) at Object.Module._extensions..js (module.js:416:10) at Module.load (module.js:343:32) at Function.Module._load (module.js:300:12) at Module.require (module.js:353:17)

I am clueless, any inputs will be appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.