laserdisc-io / fs2-aws Goto Github PK
View Code? Open in Web Editor NEWfs2 utilities to interact with AWS
License: MIT License
fs2 utilities to interact with AWS
License: MIT License
Hi there! ๐
I am unable to follow these instructions and get the latest verison, the snippet with 5.0.0 fails to resolve.
Looking at https://repo1.maven.org/maven2/io/laserdisc/ it doesn't look like there is a fs2-aws
artifact - does the docs need to be updated, or is there an artifact that doesn't get published as expected?
Add a Sink that sends SNS messages to a topic
package fs2
package aws
//import java.util.concurrent.Executors
import cats.effect.{Blocker, ContextShift, IO}
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import fs2.aws.internal.S3Client
import fs2.aws.s3._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.concurrent.ExecutionContext
class S3Spec extends AnyFlatSpec with Matchers {
//private val blockingEC = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(6))
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val s3Client: S3Client[IO] = fs2.aws.utils.s3TestClient
val provider = new AWSStaticCredentialsProvider(
new BasicAWSCredentials("...","...")
)
val client = AmazonS3ClientBuilder
.standard
.withCredentials(provider)
.withRegion("eu-west-1") // or whatever your region is
.build
val reader: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
readS3FileMultipart[IO]("...", "text", 25, client)
.through(fs2.text.utf8Decode)
.through(fs2.text.lines)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(java.nio.file.Paths.get("output.txt"), blocker))
}
reader.compile.drain.unsafeRunSync
}
getting the following error:
An exception or error caused a run to abort.
java.lang.NullPointerException
at java.io.Reader.<init>(Reader.java:78)
at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
at scala.io.BufferedSource.reader(BufferedSource.scala:26)
at scala.io.BufferedSource.bufferedReader(BufferedSource.scala:27)
at scala.io.BufferedSource.charReader$lzycompute(BufferedSource.scala:37)
at scala.io.BufferedSource.charReader(BufferedSource.scala:35)
at scala.io.BufferedSource.scala$io$BufferedSourcedecachedReader(BufferedSource.scala:64) at scala.io.BufferedSource.mkString(BufferedSource.scala:93) at fs2.aws.utils.packagedecachedReader(BufferedSource.scala:64)atscala.io.BufferedSource.mkString(BufferedSource.scala:93)atfs2.aws.utils.packageanon$1.$anonfun$getObjectContentOrError$1(package.scala:27)
Open to suggestions here!
We need to work on the README for sure, maybe some Scaladocs and a github page for the package.
SQS seem to support fetch message count between 1 and 10. The library's default is 100.
WHAT IS WRONG
I use uploadFileMultipart
to stream serialized database transactions to an S3 file. But, in the edge case that my DB query produces no results, I get the following error:
The XML you provided was not well-formed or did not validate against our published schema (Service: S3, Status Code: 400, Request ID: AXVBZSEAM6S8WJR7, Extended Request ID: luAacBiEwX5Xt8rpN2BvoNdaub+Q3phQTKal5flI0X5GedYxgIRONNXXp77V+6Yd7EZw0PL20t4=)
WHAT IS EXPECTED
I think that the most sensible thing in this case is to just create an empty file.
Hello, will you be open for a PR to move the SQS integration to its own module?
Thanks for running this project. Interested to try it out with CE3 and was wondering if v4.0.0-RC1 was available prebuilt anywhere? I didn't see it on maven.
The SQS example in README is out of date, Please provide updated one.
We have an fs2 job that runs in one AWS account and must upload files to a separate AWS account. To make sure the ownership/permissions of the files are correct we must set bucket owner full control
on the upload request. However, the fs2-aws API makes this tricky to do.
To set this property we extend the AbstractAmazonS23
class and implement just enough of the API to support uploading through fs2-aws. The code looks something like this:
object S3ClientWithCannedAcl extends AbstractAmazonS3 {
private val client = AmazonS3ClientBuilder.defaultClient()
override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = {
request.setCannedACL(CannedAccessControlList.BucketOwnerFullControl)
client.initiateMultipartUpload(request)
}
override def uploadPart(request: UploadPartRequest): UploadPartResult = {
client.uploadPart(request)
}
override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = {
client.completeMultipartUpload(request)
}
}
fs2.aws.s3.uploadS3FileMultipart[IO](bucket, key, chunkSize, amazonS3 = S3ClientWithCannedAcl)
It would be great if the fs2-aws API allowed for setting properties like this in a more direct way.
hello,
i was playing with Doobie and streaming to S3 and had some problems of making it work cause i would like to work within the doobie effect ConnectionIO
which does not implement Concurrent
while the create
method of fs2.aws.s3.S3
does require it.
I dont see why (yet?)
def create[F[_]: Async: Concurrent](s3: S3AsyncClientOp[F]): F[S3[F]]
would the following be sufficient (at least it compiles)?
def create[F[_]: Async](s3: S3AsyncClientOp[F]): F[S3[F]]
The source is located under fs2.aws.sqs
folder structure but the actual class has sqs
package declared.
I think the class was misplaced. I wonder if you're open to put it back on the right place?
Should encrypt a stream of Bytes and transform it into a stream of encrypted Bytes
Apologies if I am behind the times here. I recently upgraded a project of mine to CE3 which uses the excellent fs2-aws-s3
module to stream data from Amazon S3 buckets. In 4.0.0 I only see kinesis-style streaming. Is there a plan to interface with S3 directly or is that now gone in favour of kinesis and should I be looking to move my data from S3 to a kinesis stream? I may well have missed something obvious, if so I would grateful if someone could demonstrate how to pull from S3 using fs2-aws-kinesis
.
This looks like a very interesting library. Can you please provide an example. I have to stream content of GZ files from S3 and I will be able to get started if there was an example of how to use this library.
Right now, my build file looks like this:
"io.laserdisc" %% "fs2-aws" % "2.28.39" excludeAll(
ExclusionRule("com.amazonaws", "aws-java-sdk-kinesis"),
ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
ExclusionRule("com.amazonaws", "amazon-kinesis-producer"),
ExclusionRule("software.amazon.kinesis", "amazon-kinesis-client"),
ExclusionRule("software.amazon.awssdk", "sts"),
ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
ExclusionRule("com.amazonaws", "amazon-sqs-java-messaging-lib"),
),
...because all I'm interested in is the S3 multipart upload logic functionality.
Is it possible to split s3, kinesis, sts, and sqs into distinct modules so that we're not otherwise forced to download this lot and a whole forest of transitive dependencies unless we actually use them?
After upgrading to v3.0.9
using readFromKinesisStream
with the default FanOut
retrieval mode returns the following error. The streamName
in KinesisConsumerSettings
is set properly.
[info] ERROR 2021-03-01 15:09:49,630 s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
[info] java.lang.NullPointerException: StreamName should not be empty
[info] at software.amazon.awssdk.utils.Validate.notEmpty(Validate.java:290)
[info] at software.amazon.kinesis.common.StreamIdentifier.singleStreamInstance(StreamIdentifier.java:85)
[info] at software.amazon.kinesis.retrieval.fanout.FanOutRetrievalFactory.createGetRecordsCache(FanOutRetrievalFactory.java:63)
[info] at software.amazon.kinesis.coordinator.Scheduler.buildConsumer(Scheduler.java:915)
[info] at software.amazon.kinesis.coordinator.Scheduler.createOrGetShardConsumer(Scheduler.java:890)
[info] at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop(Scheduler.java:416)
[info] at software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:325)
[info] at fs2.aws.kinesis.consumer$.$anonfun$readChunksFromKinesisStream$7(consumer.scala:239)
The application tries to read from one stream which has one shard.
Switching to Polling
works fine.
Having an S3 client up and running, basically as described in the readme, an upgrade from 3.0.7 to version 3.0.9 leads to the following compile error:
[error] found : software.amazon.awssdk.services.s3.S3Client
[error] required: io.laserdisc.pure.s3.tagless.S3AsyncClientOp[?]
[error] .eval(S3.create(awsS3SyncClient, blocker))
So we got a Kinesis consumer which is getting messages from a Kinesis stream, decoding them into a custom case class and then we perform some actions according to the information decoded. The method in charge of consuming and decoding the stream is the following:
def consumeCommands: F[Unit] =
kinesis
.readFromKinesisStream(consumerSettings)
.evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
.compile
.drain
And the methods which decode the message and execute the command are the followings:
def decodeAndExecuteCommand(value: String): F[Unit] =
decode[KinesisCommand](value) match {
case Right(command) => logger.info(s"Processing $value") >> executeCommand(command)
case Left(err) => logger.error(s"Error decoding the command: $err")
}
private def executeCommand(command: KinesisCommand): F[Unit] = ???
So, imagine that, for some unexpected reason executeCommand(command: KinesisCommand)
raises an exception:
private def executeCommand(command: KinesisCommand): F[Unit] = {
throw new IllegalStateException("An unexpected problem")
}
How can we handle this exception gracefully in our consumeCommands
method so our Kinesis consumer continues consuming messages? I've tried to put some .recoverWith()
in several parts of the code in order to recover from the unexpected exception:
def consumeCommands: F[Unit] =
kinesis
.readFromKinesisStream(consumerSettings)
.evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
.recoverWith { case e: Throwable => fs2.Stream.eval(logger.error(e.getMessage)) }
.compile
.drain
But the consumer stills gets the exception, shut itself down, and stops consuming new messages:
[io-compute-12] ERROR c.g.m.a.Server - An unexpected problem
[prefetch-cache-shardId-000000000000-0000] ERROR s.a.k.r.p.PrefetchRecordsPublisher - meat-grinder-scala-commands:shardId-000000000000 : Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client.
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[io-compute-3] INFO s.a.k.c.Scheduler - All record processors have been shutdown successfully.
[io-compute-3] INFO s.a.k.c.Scheduler - Starting worker's final shutdown.
[io-compute-3] INFO s.a.k.m.CloudWatchPublisherRunnable - Shutting down CWPublication thread.
[cw-metrics-publisher] INFO s.a.k.m.CloudWatchPublisherRunnable - CWPublication thread finished.
[io-compute-3] INFO s.a.k.c.Scheduler - Worker loop is complete. Exiting from worker.
So, what is the correct strategy to handle the exception and keep the consumer up and getting messages from the stream?
Kind regards!
Where are the latest artifacts published to? I only see up to v3.1.1
on maven.
In response to a ListObjectsV2Request, if a bucket contains too many objects (more than a 1000 in my experience), AWS automatically truncates the response to a reasonable size (1000, as before) and sends along with it a continuation token to send the next request.
This behaviour is completely ignored by s3.listFiles, leading to an obvious bug while listing crowded buckets.
DynamoDB sink to write items
We have an fs2 job that reads a JSON file from S3 and then processes this file with 30 parallel streams to extract different sets of data to write to 30 parquet files. We find that if there is an error reading the input file, such as the file key being incorrect or the S3 permissions being incorrect, then the job will hang, print no errors, and produce no output.
However, we also find that if we configure the job to produce fewer out files (which results in fewer parallel streams), then the job doesn't hang and an appropriate exception is produce and shows up in the logs.
The problem appears to be that when the stream from S3 is read it does not use the Blocker
that it should. To work around this we no longer use readS3File
, but instead use our own version, which is as follows:
def readFromS3(bucket: String, key: String, blocker: Blocker, chunkSize: Int)(implicit sync: Sync[IO], shift: ContextShift[IO]): ByteStream = {
fs2.io.readInputStream[IO](
blocker.blockOn(IO(AmazonS3ClientBuilder.defaultClient().getObject(new GetObjectRequest(bucket, key)).getObjectContent)),
chunkSize = chunkSize,
blocker = blocker,
closeAfterUse = true)
}
The first difference from the fs2-aws API is that this readFromS3
method takes a Blocker
instead of an ExecutionContext
, which seems more correct as we already have a Blocker
in context to use for this. The second different from the fs2-aws API is that blocker.blockOn(...)
is used to wrap the IO[InputSTream]
, which is what ultimately appears to fix the hanging problem.
I might be missing something, but when I add fs2-aws:3.0.2
as a dependency I don't have fs2.aws.sqs
package.
Also don't see AWS SQS runtime dependency on Maven, though 2.29.0 works as expected.
checkpoint()
has Unit
return type, while it's clearly side-effectful (as well as canCheckpoint
). Wondering if there are technical reasons for not having F[_]: Sync
?
Hello! I have a requirement to use fs2-aws-s3 within a tracing context. We are using trace4cats and thus our effect type is Kleisli[F, Span[F], *
. For most of our use cases, changing to this effect type has been either built in or easy to add. For fs2, we have the option of using the translate method. However, because the public API for fs2-aws-s3 has a method signature of Pipe[F, Byte, ETag]
for the methods of uploadFile
and uploadFileMultipart
, we cannot apply the translate
method and thus cannot easily change the effect type.
A few options would be:
With any of these options a new effect type can easily be applied for all methods.
Happy to make a PR with whichever option is decided by the maintainers. Thanks for your time.
I was thinking one way we could get traction on the project would be to show some performance improvements on top of the Java AWS API, so having bench-marking could help
Originally discovered by @tpon. Currently if invoked with a path to a file that doesn't exist in S3, an empty stream will be returned. Up for discussion, but I feel like an exception should be thrown so the user is aware that the file doesn't exist.
Getting messages from SQS
Hi,
Any interest in having a single module with only the awssdk:s3
version 2 dependency? I adapted all the functions in https://github.com/laserdisc-io/fs2-aws/blob/master/fs2-aws/src/main/scala/fs2/aws/s3/package.scala and I'd be happy to contribute that but it now lives in a private repo (we are using it in production at the moment).
Let me know what you think.
Kinesis sink and stream
My app is using slf4j-simplelogger as a logging backend and I'm getting following warning when running it:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.30/slf4j-simple-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
As I understand logback-classic
is the same kind of backend as slf4j-simplelogger
and they're both supposed to be facaded by slf4j-api and not included into libs, instead an app developer should be adding it as a dependency.
Should decrypt a Stream of Client side encrypted bytes
Create an SQS Sink to send messages to SQS
For apps that need to use assume-role or session credentials (see https://docs.aws.amazon.com/AmazonS3/latest/dev/AuthUsingTempSessionTokenJava.html), the current approach does not work. Some possible solutions:
or 2) Make fs2.aws.kinesis.defaultScheduler() public or protected
or 3) Make fs2.aws.kinesis.readFromKinesisStream() public or protected
I'm using fs2.aws.dynamodb.package$#readFromDynamDBStream
to read from a ddb stream, and fs2.aws.dynamodb#checkpointRecords
to checkpoint the records after processing.
The following problem arises when there is a delay (in my case, intentional) in checkpointing the records, and the software.amazon.kinesis.processor.ShardRecordProcessor#shardEnded
event fires. fs2_aws correctly checkpoints the end of the shard, but then subsequent checkpoints of the previously emitted events will fail with:
ava.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: 554022700000000052561897332,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: SHARD_END,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: SHARD_END,SubsequenceNumber: 0}
We need to maintain a list of all in-flight commitable records, and evict them when fs2-aws checkpoints a shardEnded
event.
Bug in the checkpointer - requires checking the shutdown reason of the record processor and checkpointing before terminating in case end of shard is reached.
Getting items from DynamoDB
The maxConcurrency
, stsAssumeRole
, and endpoint
values in KinesisSettings
are not used anywhere else in the code. It looks like when they were added they were each used for building a KinesisAsyncClient
in consumer.scala
:
However it looks like consumer.scala
was removed as part of the cats-effect 3 upgrade. Would it make sense to remove these values as well? Or would it make more sense to restore some functionality to build a kinesis client given some KinesisSettings
?
The requested range is not satisfiable (Service: Amazon S3; Status Code: 416; Error Code: InvalidRange
<Error><Code>InvalidRange</Code><Message>The requested range is not satisfiable</Message><RangeRequested>bytes=263-1263</RangeRequested><ActualObjectSize>263</ActualObjectSize>
From README.md
:
Stream.emits("test data".getBytes("UTF-8"))
.through(s3.uploadFileMultipart(BucketName("foo"), FileKey("bar"), partSize = 5))
.evalMap(t => IO(println(s"eTag: $t")))
Does not work anymore, since 5 is an Int
, and not a PartSizeMB
.
Any pointers on how to create an instance of PartSizeMB
?
Update:
Fixed by copying the definition of PartSizeMB
to my codebase, but I'd rather not import it if there's a better way
Hi,
using this library for an app running on EKS with injecting AWS credentials via IRSA, it doesn't seem to read the credentials provided as Web identity Token. Were using the lib fs2.aws.kinesis.publisher
.
These are provided via the env vars AWS_ROLE_ARN
and AWS_WEB_IDENTITY_TOKEN_FILE
.
Can we have support for this authentication method as well, please?
Thanks
please add scala 2.13 support since all libraries You are using are available with Scala 2.13.
the only issue is the scanamo-circe
library with is also under Your maintenance
Regards
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.