Code Monkey home page Code Monkey logo

flink-training-exercises's Introduction

⚠️ This repository has been archived and is no longer being maintained. For up-to-date content, see https://github.com/ververica/flink-training. ⚠️


This repository contains examples and exercises for Apache Flink.

flink-training-exercises's People

Contributors

aljoscha avatar alpinegizmo avatar eronwright avatar fhueske avatar gjl avatar knaufk avatar mbalassi avatar mxm avatar peterschrott avatar qinjunjerry avatar rmetzger avatar rsalama avatar senorcarbone avatar smarthi avatar uce avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-training-exercises's Issues

Flink Kafka Consumer throws Null Pointer Exception when consuming from Kafka topic

I am using this example Flink CEP where I am separating out the data as I have created one application which is Sending application to Kafka & another application reading from Kafka... I generated the producer for class TemperatureWarning i.e. in Kafka,I was sending data related to TemperatureWarning Following is my code which is consuming data from Kafka...

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers", "PUBLICDNS:9092");
properties.setProperty("zookeeper.connect", "PUBLICDNS:2181");
properties.setProperty("group.id", "test");

DataStream<TemperatureWarning> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureWarning>("MonitoringEvent", new MonitoringEventSchema(), properties));
  Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
          .next("second")
          .within(Time.seconds(20));


  PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
          dstream.keyBy("rackID"),
          alertPattern);
  DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
      (Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> {
          TemperatureWarning first = pattern.get("first");
          TemperatureWarning second = pattern.get("second");

          if (first.getAverageTemperature() < second.getAverageTemperature()) {
              out.collect(new TemperatureAlert(second.getRackID(),second.getAverageTemperature(),second.getTimeStamp()));
          }
      });
dstream.print();
alerts.print();
env.execute("Flink Kafka Consumer");

But when I execute this application,it throws following Exception:
java.lang.NullPointerException at com.yash.source.MonitoringEventSchema.deserialize(MonitoringEventSchema.java:74) at com.yash.source.MonitoringEventSchema.deserialize(MonitoringEventSchema.java:1) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:466)
Following is my code for TemperatureWarning

public class TemperatureWarning { 
public int rackID;
    public double averageTemperature;
    public long timeStamp;
    public TemperatureWarning(int rackID, double averageTemperature,long timeStamp) {
        this.rackID = rackID;
        this.averageTemperature = averageTemperature;
        this.timeStamp=timeStamp;
    }

    public TemperatureWarning() {
        this(-1, -1,-1);
    }

    public int getRackID() {
        return rackID;
    }

    public void setRackID(int rackID) {
        this.rackID = rackID;
    }

    public double getAverageTemperature() {
        return averageTemperature;
    }

    public void setAverageTemperature(double averageTemperature) {
        this.averageTemperature = averageTemperature;
    }
    public long getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof TemperatureWarning) {
            TemperatureWarning other = (TemperatureWarning) obj;

            return rackID == other.rackID && averageTemperature == other.averageTemperature;
        } else {
            return false;
        }
    }
    @Override
    public int hashCode() {
        return 41 * rackID + Double.hashCode(averageTemperature);
    }
    @Override
    public String toString() {
       //return "TemperatureWarning(" + getRackID() + ", " + averageTemperature + ")";
        String str=getRackID()+","+averageTemperature+","+getTimeStamp();
        return str;
        //return "TemperatureWarning(" + getRackID() +","+averageTemperature + ") "+ "," + getTimeStamp(); 
    }
}

Following is my code for MonitoringEventSchema :

public class MonitoringEventSchema implements DeserializationSchema<TemperatureWarning>,SerializationSchema<TemperatureWarning>
{
    //@Override
    public TypeInformation<TemperatureWarning> getProducedType() {
        // TODO Auto-generated method stub
        return TypeExtractor.getForClass(TemperatureWarning.class);
    }
    //@Override
    public byte[] serialize(TemperatureWarning element) {
        // TODO Auto-generated method stub
        return element.toString().getBytes();
    }
    //@Override
    public TemperatureWarning deserialize(byte[] message) throws IOException {
        // TODO Auto-generated method stub
        TemperatureWarning warning=null;
        System.out.println("Before Encoded:"+message);
        if(message!=null)
        {
            String str=new String(message,"UTF-8");
            System.out.println("Message:"+str);
            String []value=str.split(",");
            warning.setRackID(Integer.parseInt(value[0]));
            warning.setAverageTemperature(Double.parseDouble(value[1]));
            warning.setTimeStamp(Long.parseLong(value[2]));
            return warning;
        }
        return null;
    }
    //@Override
    public boolean isEndOfStream(TemperatureWarning nextElement) {
        // TODO Auto-generated method stub
        return false;
    }
}

Can anyone please provide me the solution for this error ?? How can I solve this error ??

No data sinks have been created yet exception in PageRankWithEdgeWeights.scala

After executing the example, an exception is thrown.

Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:929)
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:908)
    at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:81)
    at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:627)
    at com.likya.flink.lectures.PageRankWithEdgeWeights$.main(PageRankWithEdgeWeights.scala:87)
    at com.likya.flink.lectures.PageRankWithEdgeWeights.main(PageRankWithEdgeWeights.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

There are lines comented may be related with the problem :

//Now run the Page Rank algorithm over the weighted graph
//val pageRanks = networkWithWeights.run[String](new PageRank[String, Double, Double, String](DAMPENING_FACTOR, maxIterations))
//pageRanks.writeAsCsv(outputPath, fieldDelimiter = "\t", lineDelimiter = "\n")

Adding dependency for flink-training-exercises not working

Hi,

I am adding the following dependency to my pom.xml -

com.data-artisans flink-training-exercises 0.15.2

Even after I build my Maven project, I'm getting compilation issues when I try to use the TaxiRide and TaxiRideSource classes:

DataStream rides = env.addSource(
new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelayInSeconds, servingSpeed));

Error:(20, 21) java: cannot find symbol
symbol: class TaxiRideSource
location: class Qualys.FimEventProcessor

build failed on Intellij IDEA 2018 with java 8 on windows 10

java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

Information:java: Errors occurred while compiling module 'flink-training-exercises'
Information:javac 1.8.0_171 was used to compile java sources
Information:2018/9/4 14:35 - Compilation completed with 15 errors and 0 warnings in 10 s 522 ms
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\basics\RideCleansingScalaTest.java
Error:(19, 72) java: package com.dataartisans.flinktraining.exercises.datastream_scala.basics not exists
Error:(27, 112) java: package com.dataartisans.flinktraining.solutions.datastream_scala.basics not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\windows\HourlyTipsScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.windows not exists
Error:(31, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.windows not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\process\ExpiringStateScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.process not exists
Error:(27, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\state\RidesAndFaresScalaTest.java
Error:(21, 71) java: package com.dataartisans.flinktraining.exercises.datastream_scala.state not exists
Error:(32, 111) java: package com.dataartisans.flinktraining.solutions.datastream_scala.state not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\windows\PopularPlacesScalaTest.java
Error:(19, 73) java: package com.dataartisans.flinktraining.exercises.datastream_scala.windows not exists
Error:(30, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.windows not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\process\LongRidesScalaTest.java
Error:(26, 112) java: package com.dataartisans.flinktraining.exercises.datastream_scala.process not exists
Error:(27, 111) java: package com.dataartisans.flinktraining.exercises.datastream_scala.cep not exists
Error:(30, 113) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
Error:(35, 112) java: package com.dataartisans.flinktraining.solutions.datastream_scala.cep not exists
Error:(40, 125) java: package com.dataartisans.flinktraining.solutions.datastream_scala.process not exists
D:\java\flink-training-exercises\src\test\java\com\dataartisans\flinktraining\exercises\datastream_java\testing\TestSource.java
Information:java: 某些输入文件使用了未经检查或不安全的操作。
Information:java: 有关详细信息, 请使用 -Xlint:unchecked 重新编译。

Calculating average in Flink DataStream

I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding by 10 seconds...Following is my code where I am calculating sum of temperatures after 10 second for every rackID,but now I want to calculate average temperatures::

static Properties properties=new Properties();
    public static Properties getProperties()
    {
        properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
        properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
        //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
        //properties.setProperty("group.id", "akshay");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

 @SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception 
{
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties props=Program.getProperties();
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
    env.execute("Temperature Consumer");
}

How can I calculate average of temperatures for each rackId based on window duration ??

Job submission exception File not found but data file is there

I am using the web Ui to submit a first test job

com.dataartisans.flinktraining.exercises.datastream_java.basics.RideCleansingExercise

-input /Users/etsang/dev/src/github.com/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/examples/data/nycTaxiRides.gz

ls /Users/etsang/dev/src/github.com/apache/flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT/examples/data/nycTaxiRides.gz is able to find the file

	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at java.io.FileInputStream.<init>(FileInputStream.java:93)
	at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.run(TaxiRideSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)```

Update Dataset in Flink using Flink Table API

I am using Flink Table API. I want to update the table in Flink through Pattern Detection..I am using 3 fields : routeno,source,distance,category. Now I want to update the category based on the value of distance for every routeno...For ex : if routeno=1 and distance<=200 then category='daily' ..How can I update this using Flink's Table API??

Checkstyle errors

mvn clean install fails with a checkstyle error about unsused imports:

[INFO] There are 2 checkstyle errors.
[ERROR] TravelTimePrediction.java[30:8] Unused import - org.apache.flink.api.common.typeinfo.TypeHint.
[ERROR] EventTimeJoinFunction.java[25:8] Unused import - java.util.Iterator.

fat-jar doesn't get depenedencies

Hi,
i try the following workflow:

  1. mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=0.10.1 \ -DgroupId=org.apache.flink.quickstart \ -DartifactId=flink-scala-project \ -Dversion=0.1 \ -Dpackage=org.apache.flink.quickstart \ -DinteractiveMode=false
  2. cd flink-scala-project
  3. mvn clean package

here is a build log: https://gist.github.com/zavalit/1e78478ebdda827f3454 and when i run

java -jar target/flink-scala-project-0.1.jar

i get

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/ExecutionEnvironment$ at org.apache.flink.quickstart.Job$.main(Job.scala:41) at org.apache.flink.quickstart.Job.main(Job.scala) Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.scala.ExecutionEnvironment$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 2 more

Build fails with javadoc error on JDK 1.7

mvn clean install fails on JDK 1.7 with: javadoc: error - invalid flag: -Xdoclint:none
Seems that you used an invalid javadoc option for JDK 1.7 in your pom.xml on line 200.

TravelTimePredictionExample wrong GeoUtils usage for direction

Hi,

i think i have found a small problem in the example.
This commit in my fork contains the fix.
The line

int direction = GeoUtils.getDirectionAngle(ride.endLon, ride.endLat, ride.startLon, ride.startLat);

should be

int direction = GeoUtils.getDirectionAngle(ride.startLon, ride.startLat, ride.endLon, ride.endLat);

because the GeoUtils API says

/**
	 * Returns the angle in degrees between the vector from the start to the destination
	 * and the x-axis on which the start is located.
	 *
	 * The angle describes in which direction the destination is located from the start, i.e.,
	 * 0° -> East, 90° -> South, 180° -> West, 270° -> North
	 *
	 * @param startLon longitude of start location
	 * @param startLat latitude of start location
	 * @param destLon longitude of destination
	 * @param destLat latitude of destination
	 * @return The direction from start to destination location
	 */
	public static int getDirectionAngle(
			float startLon, float startLat, float destLon, float destLat)

Or maybe I am wrong? Looking forward for feedback, thanks in advance.

Errors due to name mismatch in package names

In the following files, the package names have "datastream_scala" instead of "datastream_java" causing errors, changing which the errors are resolved.
Couldn't make a branch and push changes due to lack of permission to do so.

src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/basics/RideCleansingScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/ExpiringStateScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/LongRidesScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/state/RidesAndFaresScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/HourlyTipsScalaTest.java
src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlacesScalaTest.java

bug in CarEventSort.java

"queueState.update(queue)" is missing at the end of the "onTimer" function.

public void onTimer(long timestamp, OnTimerContext context, Collector<ConnectedCarEvent> out) throws Exception {
PriorityQueue<ConnectedCarEvent> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
ConnectedCarEvent head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}

Dataset links are broken

No longer able to download the datasets for the training-exercises the links are broken:

wget http://training.ververica.com/trainingData/nycTaxiRides.gz URL transformed to HTTPS due to an HSTS policy --2020-06-29 15:19:37-- https://training.ververica.com/trainingData/nycTaxiRides.gz Resolving training.ververica.com (training.ververica.com)... 151.101.65.195, 151.101.1.195 Connecting to training.ververica.com (training.ververica.com)|151.101.65.195|:443... connected. HTTP request sent, awaiting response... 404 Not Found 2020-06-29 15:19:37 ERROR 404: Not Found.

where is the data set

when I do the example of CarEventSort.java , i failed to find the corresponding data set for this example, this example is very important, so how can i find them? many thx!

RidesAndFaresSolution.scala Questions

Hi, I have some confusions on connect operations in flink, wonder if I can ask here:

  1. the result intends to collect (ride, fare) for each orderId, why after collecting, only fare state is cleared? Since the result for that orderId has been joined and collected, shouldn't all states be cleared out?
  2. suppose there is one-to-many relationships, where ride -> fare1 and fare2. Then if I don't clear the state, will it emit (ride, fare1) and (ride, fare2)?

Thanks in advance!

Maven Clean package failure

Hi, getting the following error while running mvn clean package

Any idea what am I doing wrong ? Scala version 2.11

bahadir@server:~/code/learn/flink/flink-training-exercises$ mvn clean package
[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for com.dataartisans:flink-training-exercises:jar:0.2
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 142, column 12
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING] 
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building Flink Exercises 0.2
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ flink-training-exercises ---
[INFO] Deleting /home/bahadir/code/learn/flink/flink-training-exercises/target
[INFO] 
[INFO] --- maven-checkstyle-plugin:2.12.1:check (validate) @ flink-training-exercises ---
[INFO] 
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flink-training-exercises ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/bahadir/code/learn/flink/flink-training-exercises/src/main/resources
[INFO] 
[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ flink-training-exercises ---
[WARNING]  Expected all dependencies to require Scala version: 2.10.4
[WARNING]  com.twitter:chill_2.10:0.5.2 requires scala version: 2.10.4
[WARNING]  com.twitter:chill-avro_2.10:0.5.2 requires scala version: 2.10.4
[WARNING]  com.twitter:chill-bijection_2.10:0.5.2 requires scala version: 2.10.4
[WARNING]  com.twitter:bijection-core_2.10:0.7.2 requires scala version: 2.10.4
[WARNING]  com.twitter:bijection-avro_2.10:0.7.2 requires scala version: 2.10.4
[WARNING]  org.scala-lang:scala-reflect:2.10.4 requires scala version: 2.10.4
[WARNING]  org.apache.flink:flink-scala:0.10.1 requires scala version: 2.10.4
[WARNING]  org.apache.flink:flink-scala:0.10.1 requires scala version: 2.10.4
[WARNING]  org.scala-lang:scala-compiler:2.10.4 requires scala version: 2.10.4
[WARNING]  org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version: 2.10.4
[WARNING]  org.apache.flink:flink-runtime:0.10.1 requires scala version: 2.10.4
[WARNING]  com.typesafe.akka:akka-actor_2.10:2.3.7 requires scala version: 2.10.4
[WARNING]  com.typesafe.akka:akka-remote_2.10:2.3.7 requires scala version: 2.10.4
[WARNING]  com.typesafe.akka:akka-slf4j_2.10:2.3.7 requires scala version: 2.10.4
[WARNING]  org.clapper:grizzled-slf4j_2.10:1.0.2 requires scala version: 2.10.3
[WARNING] Multiple versions of scala libraries detected!
[INFO] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java:-1: info: compiling
[INFO] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/scala:-1: info: compiling
[INFO] Compiling 25 source files to /home/bahadir/code/learn/flink/flink-training-exercises/target/classes at 1449867448989
[WARNING] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/scala/com/dataartisans/flinktraining/exercises/table_scala/memberotm/MemberOTMonth.scala:74: warning: Type org.apache.flink.api.table.Row is no POJO, has immutable fields: value fields, value arity.
[WARNING]     membersOTMonth.toDataSet[Row].print()
[WARNING]                             ^
[WARNING] one warning found
[INFO] prepare-compile in 0 s
[INFO] compile in 15 s
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ flink-training-exercises ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 15 source files to /home/bahadir/code/learn/flink/flink-training-exercises/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/TaxiRideSource.java:[204,68] no suitable constructor found for PriorityQueue(<anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>>)
    constructor java.util.PriorityQueue.PriorityQueue(java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
      (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
    constructor java.util.PriorityQueue.PriorityQueue(java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
      (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
    constructor java.util.PriorityQueue.PriorityQueue(java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
      (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
    constructor java.util.PriorityQueue.PriorityQueue(int,java.util.Comparator<? super org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
      (actual and formal argument lists differ in length)
    constructor java.util.PriorityQueue.PriorityQueue(int) is not applicable
      (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to int by method invocation conversion)
    constructor java.util.PriorityQueue.PriorityQueue() is not applicable
      (actual and formal argument lists differ in length)
[INFO] 1 error
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 23.164 s
[INFO] Finished at: 2015-12-11T21:57:46+01:00
[INFO] Final Memory: 41M/660M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-training-exercises: Compilation failure
[ERROR] /home/bahadir/code/learn/flink/flink-training-exercises/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/TaxiRideSource.java:[204,68] no suitable constructor found for PriorityQueue(<anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>>)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.SortedSet<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.PriorityQueue<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to java.util.Collection<? extends org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>> by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(int,java.util.Comparator<? super org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>) is not applicable
[ERROR] (actual and formal argument lists differ in length)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue(int) is not applicable
[ERROR] (actual argument <anonymous java.util.Comparator<org.apache.flink.api.java.tuple.Tuple2<java.lang.Long,java.lang.Object>>> cannot be converted to int by method invocation conversion)
[ERROR] constructor java.util.PriorityQueue.PriorityQueue() is not applicable
[ERROR] (actual and formal argument lists differ in length)
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

UnsupportedOperationException

Modifed : I realised that tha Java version was mapped to 0.2 version of exercises but the scala was mapped to 0.3

I changed it to 0.2 and now it is working as expected.

Is there any thing strange ?

While trying a got an exception below.

```

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.UnsupportedOperationException: Automatic-Timestamp sources cannot emit elements with a timestamp. See interface ManualTimestampSourceFunction if you want to manually assign timestamps to elements.
at org.apache.flink.streaming.api.operators.StreamSource$AutomaticWatermarkContext.collectWithTimestamp(StreamSource.java:226)
at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.generateUnorderedStream(TaxiRideSource.java:278)
at com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource.run(TaxiRideSource.java:130)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.