Code Monkey home page Code Monkey logo

suro's Introduction

Suro: Netflix's Data Pipeline

Suro is a data pipeline service for collecting, aggregating, and dispatching large volume of application events including log data. It has the following features:

  • It is distributed and can be horizontally scaled.
  • It supports streaming data flow, large number of connections, and high throughput.
  • It allows dynamically dispatching events to different locations with flexible dispatching rules.
  • It has a simple and flexible architecture to allow users to add additional data destinations.
  • It fits well into NetflixOSS ecosystem
  • It is a best-effort data pipeline with support of flexible retries and store-and-forward to minimize message loss

Learn more about Suro on the Suro Wiki and the Netflix TechBlog post where Suro was introduced.

Master Build Status

Pull Request Build Status

Build

NetflixGraph is built via Gradle (www.gradle.org). To build from the command line:

./gradlew build

See the build.gradle file for other gradle targets, like distTar, distZip, installApp, and runServer.

Running the server

You can run the server locally by just running ./gradlew runServer.

More more advanced usage you may wish to run ./gradlew installApp and then:

cd suro-server
java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json

To enable basic logging you can downloaded slf4j-simple-1.7.7.jar and copy it into suro-server then run:

cd suro-server
java -cp "build/install/suro-server/lib/*:slf4j-simple-1.7.7.jar" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json

Support

We will use the Google Group, Suro Users, to discuss issues: https://groups.google.com/forum/#!forum/suro-users

suro's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

suro's Issues

Injected message producer

public class SomeService {
    private MessageProducer producer;

    @Inject
    public SomeService(MessageProducer producer) {
        this.producer = producer;
    }

    public void doSomething() {
        Future<Boolean>  future = producer.produce(OutgoingMessage().withBody("blah blah").build());
        future.get();
    }
}

Change StatusServer entry points

"/healthcheck" is too common and conflicts with potential Annotation scan of URI template "/healthcheck(/.*)

Let's change "/healthcheck" to "/surohealthcheck" and "/sinkstat" to "/surosinkstat"

Guice based SuroPluginModule

Treat all Sink and source types as independent modules that are enabled by adding them to the withModules of the injector creation step. Various implementations will be made available using guice mapbinder.

TestKafkaSink fails

We didn't find the root cause of this unit test failure but instead of using ZkClient's test zookeeper server, using curator-test looks better.

Adding alias to the routing

We have a requirement to route messages with the different topic name. 'alias' will be added to routing map configuration. If that property is valid, messages will be routed with that 'alias', not the original routingKey.

shutdown timeout problem ???

'''
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
심각: The web application [] appears to have started a thread named [NFLoadBalancer-PingTimer-suroClient] but has failed to stop it. This is very likely to create a memory leak.
...
...
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
심각: The web application [] appears to have started a thread named [AsyncSuroClientPoller-0] but has failed to stop it. This is very likely to create a memory leak.
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
...
...
정보: Stopping Coyote HTTP/1.1 on http-10004
Exception in thread "NFLoadBalancer-PingTimer-suroClient" java.lang.NullPointerException
at com.netflix.loadbalancer.BaseLoadBalancer$PingTask.run(BaseLoadBalancer.java:583)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)
'''

EventBus filter

EventBus filter is the part of Netflix Common now. Remove EventBus filter source code and make the dependency on Netflix Common.

Thrift server error

After building suro-client and starting the server following instructions on the wiki, we're getting the following error:

ERROR org.apache.thrift.server.TNonblockingServer - Read an invalid frame size of 0. Are you using TFramedTransport on the client side?

Incorrect ues of System.exit()

Both SuroService and ThriftServer use System.exit(). They shouldn't. Only Suro driver should decide whether to use System.exit() or not.

Disruptor for MessageQueue

Implementat a Disruptor based MessageQueue. Part of this will also be to determine whether distruptor actually makes sense.

Implement dynamic RemotePrefixFormatter

Define reserved keywords first

  • date
  • static
  • property

Users can combine them together for generating remote prefix.

'date' needs date format string
'static' is static string value
'property' is replaced by the property value from ConfigurationManager.getConfigInstance().getProperty()

Artifact 'com.google.code.findbugs:jsr305:1.3.9@jar' not found.

[root@localhost suro]# ./gradlew clean build

FAILURE: Build failed with an exception.

  • What went wrong:
    Could not resolve all dependencies for configuration ':classpath'.

    Artifact 'com.google.code.findbugs:jsr305:1.3.9@jar' not found.

  • Try:
    Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

Total time: 5.438 secs
[root@localhost suro]#

MessageQueue

Create abstraction for MessageQueue so we can try various implementations instead of just LinkedBlockingQueue.

./gradlew clean build fails on Amazon Linux

Steps:

Bring up a new Amazon Linux instance

sudo yum -y update
sudo sum -y install git

git clone https://github.com/Netflix/suro.git
cd suro
./gradlew build

Errors:

:suro-server

com.netflix.suro.server.TestSuroControl > testOnlyExitCommandWorks FAILED
    java.net.ConnectException at TestSuroControl.java:67

This error is intermittent. Retrying the builds sometimes succeeds. This may be because of the findAvailablePort method is returning a used port.

The source line is:

Socket client = new Socket("127.0.0.1", port);

:suro-client

com.netflix.suro.connection.TestConnectionOutPool > testOutPool FAILED
    java.lang.AssertionError at TestConnectionOutPool.java:67

Source:

final ConnectionPool pool = injector.getInstance(ConnectionPool.class);
assertEquals(pool.getPoolSize(), 1);

Remove hadoop dependency from suro-client

Suro message is using hadoop for Writable implementation but this looks an extra dependency because Writable implementation is used for only writing it to hadoop sequence file. Let's remove it but it can break backward compatibility because value class of hadoop sequence file is changed.

Annotation basic Consumer API

class SomeService {
    // Optional inject of MessageConsumer to stop/start at will
    @Inject
    public SomeService(@Named("someconsumerid") MessageConsumer consumer) {
        this.consumer = consumer; 
    }

    public void start() {
        consumer.start();
    }

    public void stop() {
        consume.stop(); 
    }

    // This is how events will be consumed.  Must ack() to delete from SQS.
    @Consume("someconsumerid")
    public void consume(IncomingMessage message) {
       // ... do something
       message.ack(); 
    }
}

Got an IOException

[2014-05-30 05:05:02.929] [Thread-5] WARN  org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05]
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_05]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05]
    at org.apache.thrift.transport.TNonblockingSocket.read(TNonblockingSocket.java:141) ~[libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:537) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read(AbstractNonblockingServer.java:338) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:203) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:202) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:158) [libthrift-0.9.1.jar:0.9.1]
[2014-05-30 05:05:02.929] [Thread-5] WARN  org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05]
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_05]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05]
    at org.apache.thrift.transport.TNonblockingSocket.read(TNonblockingSocket.java:141) ~[libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:537) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read(AbstractNonblockingServer.java:338) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:203) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:202) [libthrift-0.9.1.jar:0.9.1]
    at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:158) [libthrift-0.9.1.jar:0.9.1]
[2014-05-30 05:05:02.930] [Thread-5] WARN  org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!

Implement row id in suro client

Adding row id would be helpful for de-dup or finding lost messages.

Suro client can set up row id explicitly, otherwise, MD5Hash value based on byte[] payload would be added automatically.

FileQueue4Sink drains same messages multiple times

FileQueue4Sink is used in QueuedSink. Its run() function calls drainTo and check message list is full or expired. FileQueue4Sink doesn't do autocommit by default and this behavior is draining same messages multiple times.

TagKey contains metrics name

The following fields should not be part of TagKey.java. Maybe we should put all the counters into a single utility class along with all the convenience methods.

public static final String SENT_COUNT = "sentMessageCount";
public static final String RECV_COUNT = "receivedMessageCount";
public static final String LOST_COUNT = "lostMessageCount";
public static final String RESTORED_COUNT = "restoredMessageCount";
public static final String RETRIED_COUNT = "retriedCount";

Implement various inputs

Currently, suro's input is only thrift server. We need to make them various including at least kafka consumer.

There will be one more dynamic configurable property called SuroServer.inputConfig json format.

If one of inputConfig element is 'thrift', all suroserver's configuration should be matched to the previous configuration which starts with SuroServer.

ForkJoin MessageQueue

Fork join message queue as alternative to simple LinkedBlockingQueue. This should improve performance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.