Code Monkey home page Code Monkey logo

tink_io's People

Contributors

back2dos avatar benmerckx avatar gama11 avatar grosmar avatar jasononeil avatar kevinresol avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

tink_io's Issues

Notify when sink ended

Trying to rewrite this code to use OutputSink + BytesOutput.

The idea is to return a sink from the function for the user to pipeTo, when the sink is ended I will collect the bytes and send them to an api which does not support streaming.

But the problem is after handing the sink to outside, there seems no way to know that the sink as ended.
I think I can extend BytesOutput and add a Future that gets triggered when its close() is called. Or it is better to add something to Sink itself?

Uncoupling from Buffer

Currently, much of the library depends on Buffer, in particular Source.read and Sink.write could just accept (bytes, offset, max) instead of the buffer. That might make it easier to integrate with other libraries. Now that things are starting to work correctly, I don't want to break everything again, but when Haxe 4 comes it, it might be a good chance to reevaluate this possibility.

New release?

I've been trying to write a parser but wasn't getting anywhere. Parsing steps worked, but I never got the result. But when I tried the latest git version everything I've been trying suddenly works. So, might be time to update haxelib?

Streaming stdout/stderr

While reworking the tink_http tests I'm trying to use tink_io to stream stdout/stderr of the server process while the tests are running to ease debugging. I need to do this in an async way so figured I only need to add tink_runloop for this to work. Basically I'm seeing things like the example below fail. This works when compiled with only -lib tink_io but fails when I add -lib tink_runloop or -lib tink_runloop -D concurrent. The trace of the exit code always works but I don't get anything piped to the terminal. Is there something I'm missing?

import sys.io.Process;
import tink.io.Sink;
import tink.io.Source;

class Main {
    static function main() {
        var process = new Process('ls', []);
        Source.ofInput('process stdout', process.stdout).pipeTo(Sink.stdout);
        Source.ofInput('process stderr', process.stderr).pipeTo(Sink.stdout);
        trace('Exit: '+process.exitCode());
    }
}

About thread safety

This is just a question more than an issue.

I would like to clarify what the behaviour will be, if there are multiple threads that try to, say, pipeTo a sink at the same time?

InputSource: Can't read from stdin

using tink.io.Source;
using tink.CoreApi;

class Main {
    static function main() {
        var source = Source.ofInput('stdin', Sys.stdin());
        source.limit(1).all().handle(function(o) trace(o.sure()));
    }
}

This code blocks forever here.

Split after split skips one item

f703868 is a failing test but I have completely no idea how to fix.

I guess it is something related to streams. Because the iteration doesn't seem to halt after the handler emits a Finish.

Slice of WrappedBuffer

Copy paste of my gitter monologue so I don't forget:

trace(part.length);
trace(remaining.length);
var sliced = part.slice(0, part.length - remaining.length);
trace(sliced.length);

If part and remaining are Chunks, the following shouldn't happen, right?

src/smtpmailer/MultipartEncoder.hx:52: 3292
src/smtpmailer/MultipartEncoder.hx:53: 1
src/smtpmailer/MultipartEncoder.hx:55: 3

Remaining is a tink.io.nodejs.WrappedBuffer, part is a CompoundChunk over two of those. I think this happens because a CompoundChunk's slice method expects the slices to deal with from and to values that can be larger or smaller than their start/length. (At least that's what it seems like if I look at ByteChunk). But as far as I can tell WrappedBuffer doesn't deal with that situation.

If I copy ByteChunk's way of dealing with slices into WrappedBuffer I get expected results. I'll see if I can open a PR with this.

Splitting sources should result in true sources

Right now, when splitting a source with a delimiter, the first part gets completely buffered and then wrapped into a source. It should be a real stream that terminates when the delimiter comes up.

As a result, multipart chunks have to get read completely before being processed. For big files, this rather expensive.

Signals the "exhaustion" of a Source

In nodejs one can subscribe the end event on a readable, so that it gets notify when the source is completely consumed (by anyone). I am playing with java io/tcp recently but java doesn't support that (but tink_tcp's allowHalfOpen needs that). Can we have the equivalent built into tink_io itself?

Order of arguments for ofInput/ofOutput and ofNodeStream

I think it might not be too late to give these methods the same order of arguments.
Right now I'm writing stuff like this:

#if nodejs
input = Source.ofNodeStream(socket, 'socket input');
output = Sink.ofNodeStream(socket, 'socket output');
#else
input = Source.ofInput('socket input', socket.input);
output = Sink.ofOutput('socket output', socket.output);
#end

Where it kind of feels like name should either come first or last in all of those methods?

Compiler complains about inline...

tink/io/Progress.hx:23: characters 22-58 : Inline variable initialization must be a constant value.

Not sure which version of Haxe I'm running atm. Ping me and I'll dig in and look

StreamParser eof

4c4dc3e#diff-9c9d45e7ba3bb2121d3006aeb61d4d9bR59

The above change was to mitigate a situation where consume is called more than once, even it returned {resume: false}. It happens when the stream has already been depleted when consume is called.

stepwise:

  1. Parser parses the last chunk in the stream and emits Done
  2. consume() is called
  3. forEach ends with Depleted instead of Halted (yeah, even the handler emits Finish)
  4. consume is called again

Not sure if the null check is a proper fix. Maybe something should be done in the doParse function instead.

Allow early closing of sources and make them auto shutdown per default.

Especially file descriptors must be closed.

If part of a file source is piped somewhere and the rest is never read, it should be possible to close the source (and thus the descriptor) without the need of piping the rest to a black hole or something.
To avoid open file descriptors from accumulating, sources that are idle for long enough (30secs?) should be automatically closed (there should be an opt-out for this of course).

Splitter issue

I wonder why it minus delim.length here

If the bytes comes in like | -- message -- | -- delim -- | then the said code will make it only read til the end of the message but it won't progress because it will not encounter the delimiter at all

API changes.

Expanding on #7 I am contemplating some slightly "drastic" API changes.

The pooled circular buffers are really nice and they help keeping the memory usage very low, but the more I move forward, the more I feel like this is an implementation detail that the user should not have to be bothered with.

So I am proposing the following simplifications:

  1. The Sink will have a write method defined like so: function write(bytes:Bytes, start:Int, length:Int):Surprise<Progress, Error>;. The reason for this is that it's not so easy to get at the data in a buffer. I'm also not 100% sure the current buffer is the best approach. The linux kernel seems to use a circular buffer of chunks (one advantage here is that the producer can write to one chunk while the consumer reads from another). Java's nio also does something different (although I haven't quite understood how it actually works^^). Going down an abstraction layer here allows to try different approaches.
  2. The read method will be removed from Source and what will remain is the pipeTo method. This greatly reduces the requirements on sinks. Noone gets to read from a sink directly. If you want data, you'll have to ask the source to pipe it to you. So for example parsers will always be sinks. The type of reading there is now can still be achieved by having a Sink that accumulates some data and then allows you to read it out the way it is now.

This is no small change, but I think it will be worth while.

Any comments @kevinresol @benmerckx ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.