Code Monkey home page Code Monkey logo

siddhi-io-file's People

Contributors

ajanthy avatar anugayan avatar charukak avatar chashikajw avatar chathurikaa avatar dilini-muthumala avatar dnwick avatar gokul avatar grainier avatar hmrajas avatar ksdperera avatar lasanthas avatar maheshika avatar methma avatar minudika avatar mohanvive avatar nisalaniroshana avatar niveathika avatar pcnfernando avatar ramindu90 avatar rukshiw avatar sahandilshan avatar sajithshn avatar senthuran16 avatar suhothayan avatar sujanan avatar sybernix avatar tharanidk avatar tishan89 avatar wso2-jenkins-bot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

siddhi-io-file's Issues

Periodic event replay after enabling state persistence

Description:
This issue occurred in a tailing scenario.
After enabling state persistence, all events in the file seemed to get replayed even though the file pointer is supposed to be remembered.

Affected Product Version:
wso2si-1.0.0-SNAPSHOT (built from latest source)

OS, DB, other environment details and versions:
Mac OS 10.13.1

Steps to reproduce:

  1. Get a freash SI pack
  2. Enable state persistence as below.
  # Periodic Persistence Configuration
state.persistence:
  enabled: true
  intervalInMin: 1
  revisionsToKeep: 2
  persistenceStore: org.wso2.carbon.streaming.integrator.core.persistence.FileSystemPersistenceStore
  config:
    location: siddhi-app-persistence
  1. Create a file called productions_json.txt with following content and save it in a prefered location.
{"event":{"name":"Almond cookie","amount":100.0}}
{"event":{"name":"Baked alaska","amount":20.0}}
  1. Deploy following siddhi app
    @App:name('CountProductions2')

    @App:description('Siddhi application to count the total number of orders.')

    @source(type='file', mode='LINE',
        file.uri='file:/Users/foo/resources/productions_json.txt',
        tailing='true',
        @map(type='json'))
    define stream SweetProductionStream (name string, amount double);

    @sink(type = 'log')
    define stream LogStream (totalProductions double);

    -- Following query counts the number of sweet productions.
    @info(name = 'query')
    from SweetProductionStream
    select sum(amount) as totalProductions
    insert into LogStream;

Note: Change file.uri to point to the file you created in step 3.

You will see the count (totalProductions) getting incremented every minute although no change is made to the file.

Can't give a directory for file source when using tailing mode

Description:
When tailing is disabled, we can specify dir.uri and give an empty folder and start the server. During runtime, we can copy a data file to the specified directory and the file will get picked up and start reading. However, when tailing is enabled we cannot specify a dir.uri but we have to give a file.uri="foo/bar.txt". A specific file name should be given and that file should be available in the directory before the server starts.

Affected Product Version:
siddhi-io-file 1.1.1

Steps to reproduce:
use the following siddhi app

@App:name("testAppTwo")
@App:description("Description of the plan")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.

@source(type = 'file', dir.uri = 'file:/Users/niruhan/My_Tasks/task5_filePatch/source', move.after.failure = 'file:/Users/niruhan/My_Tasks/task5_filePatch/failure', mode = 'LINE', tailing = 'true', action.after.failure = 'MOVE', add.event.separator = 'false', @map(type = 'csv')) 
define stream FooStream (attr1 int);

define stream OutputStream (attr1 int);

@info(name = 'SiddhiApp2') 
from FooStream#log("READ_VALUES")
select *
insert into OutputStream;

start the server. copy a file with the following contents into the dir.uri specified

1
2
3
4

Related Issues:
wso2/product-sp#1047

ClassCastException: org.quartz.simpl.RAMJobStore cannot be cast to org.quartz.spi.JobStore

Description:
I am using the latest build siddhi-io-file-2.0.17-SNAPSHOT.
Observed the following error log when trying to read a CSV file, using Siddhi File Source.

Error log

[2021-08-28 19:31:13,883] ERROR {org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner} - Exception occurred while scheduling job org.quartz.SchedulerException: JobStore class 'org.quartz.simpl.RAMJobStore' could not be instantiated. [See nested exception: java.lang.ClassCastException: org.quartz.simpl.RAMJobStore cannot be cast to org.quartz.spi.JobStore]
	at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:869)
	at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559)
	at org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner.start(PollingTaskRunner.java:73)
	at org.wso2.carbon.connector.framework.server.polling.PollingServerConnector.start(PollingServerConnector.java:57)
	at org.wso2.transport.file.connector.server.FileServerConnector.start(FileServerConnector.java:64)
	at io.siddhi.extension.io.file.FileSource.lambda$deployServers$7(FileSource.java:911)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.quartz.simpl.RAMJobStore cannot be cast to org.quartz.spi.JobStore
	at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:867)
	... 8 more

Affected Product Version:
siddhi-io-file-2.0.17-SNAPSHOT.

OS, DB, other environment details and versions:
SI-tooling-1.1.0 with siddhi-io-file-2.0.17-SNAPSHOT.

Steps to reproduce:

  1. Create a CSV file with below content on local file system.
{"event":{"symbol":"foo"}
  1. Create the below Siddhi app on SI-tooling . Set the location of the CSV file you created in previous step in the file.uri parameter. Then start the app.
@App:name('ManageProductionStats')

@source(type = 'file', mode = "LINE", file.uri = "file:/Users/user/data/product.csv", tailing = "true",
    @map(type = 'csv'))
define stream SweetProductionStream (name string, amount double);

@sink(type='log')
define stream TotalProductionStream (name string, amount double);

from SweetProductionStream 
select name, sum(amount) as amount 
group by name
insert into TotalProductionStream;

Expected behaviour:
The following log should get printed.

[2021-09-06 22:14:31,603]  INFO {io.siddhi.core.stream.output.sink.LogSink} - ManageProductionStats : TotalProductionStream : Event{timestamp=1630946671603, data=[foo], isExpired=false}

Actual behaviour:
Following error gets printed.

ClassCastException: org.quartz.simpl.RAMJobStore cannot be cast to org.quartz.spi.JobStore

Related Issues:
#142

Specify allowed combinations of parameters

Description:
Only one of the dir.uri or file.uri is allowed.
move.after.process is needed only when action.after.process is 'move'
move.after.failure is needed only when action.after.failure is 'move'

These can be specified as combination of parameters using parameter overload feature in Siddhi annotations (https://github.com/siddhi-io/siddhi/blob/master/modules/siddhi-annotations/src/main/java/io/siddhi/annotation/ParameterOverload.java)

Suggested Labels:
Type/Improvement

Related Issues:
wso2/streaming-integrator-tooling#46

Regex matching is not dynamic in file:copy function

Description:
Regex matching is not dynamic in file:copy function. Please refer the following Siddhi app. This does not get deployed successfully.

@App:name("FileCopy")

@App:description("Description of the plan")

define stream CopyFileStream(regex string);

@sink(type='log') 
define stream logStream (isSuccess bool);

-- You can send the regex   .*foo.txt$
from CopyFileStream#file:copy('/Users/dilini/source', '/Users/dilini/dest', regex, true)
select isSuccess
insert into logStream;

Affected Product Version:
SI 1.1.0

'include.by.regexp' and 'exclude.root.dir' parameters in file:move() are not dynamic

Description:

  • 'include.by.regexp' and 'exclude.root.dir' parameters in file:move() are not dynamic
  • When 'exclude.root.dir' parameter is set to true, the destination path does not get created correctly
  • When four parameters are given, the regex is always set to ''

Affected Product Version:
SI 1.1.0

OS, DB, other environment details and versions:
N/A

Steps to reproduce:
As given in the description.

Add support for SMB and Webdav protocols

Description:
This issue is created to track the following tasks:

  • Add support to read from/write to files opened via SMB
  • Add support to read from/write to files opened via Webdav

Improvement to read the file header

Description:
Currently the alternative to read just the header is by using a RegEx. This implementation enable the user to read the 1st line of the file and send it to further processing without processing the whole file.

Issue in metrics at source & sink levels

Description:
There are some issues in metrics at the source & sink levels

  • Reading percentage(files which are already completely read) reset when FileProcessor starts to read a new file
  • Wrong file size exposing from file-sink
  • some source-level metrics throws exceptions on windows OS

Reading large file overflow the heap space: OutofMemoryError

Description:
I'm trying to read a database dump file more than 1GB to build a database and use it in Elasticsearch after cleaning and transforming the data (ETL). Before running siddhi app, I increased the memory to 4GB for both the editor and the worker in case the worker is executed by the editor.

After checking the source code of the remote file transporter, it seems like this method "VFSClientConnector.toByteArray" keeps reading the file to the end and acquiring more buffer with no limits or without waiting till read buffer to be flushed first which breaks the heap size.

Please advise if there is any other extension that reads large files without breaking the memory.

I'm currently using Logstash to overcome these issues, however I'm wishing to use siddhi in production but unfortunately I can't use it at the moment with this issue.

The exception thrown is:
Exception in thread "remote-file-1-thread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.wso2.transport.file.connector.sender.VFSClientConnector.toByteArray(VFSClientConnector.java:221)
at org.wso2.transport.file.connector.sender.VFSClientConnector.send(VFSClientConnector.java:172)
at org.wso2.extension.siddhi.io.file.listeners.FileSystemListener.onMessage(FileSystemListener.java:173)
at org.wso2.transport.remotefilesystem.server.RemoteFileSystemProcessor.run(RemoteFileSystemProcessor.java:89)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Code that throws the exception here:
at org.wso2.transport.file.connector.sender.VFSClientConnector.send(VFSClientConnector.java:172)

https://www.programcreek.com/java-api-examples/?code=wso2%2Fcarbon-transports%2Fcarbon-transports-master%2Ffile%2Forg.wso2.carbon.transport.remote-file-system%2Fsrc%2Fmain%2Fjava%2Forg%2Fwso2%2Fcarbon%2Ftransport%2Fremotefilesystem%2Fserver%2FRemoteFileSystemProcessor.java#

/**
* Obtain a byte[] from an input stream
*
* @param input The input stream that the data should be obtained from
* @return byte[] The byte array of data obtained from the input stream
* @throws IOException
*/
private static byte[] toByteArray(InputStream input) throws IOException {
long count = 0L;
byte[] buffer = new byte[4096];
int n1;
ByteArrayOutputStream output = new ByteArrayOutputStream();
for (; -1 != (n1 = input.read(buffer)); count += (long) n1) {
output.write(buffer, 0, n1);
}
if (logger.isDebugEnabled()) {
logger.debug(count + " bytes read");
}
byte[] bytes = output.toByteArray();
closeQuietly(output);
return bytes;
}

Suggested Labels:

Suggested Assignees:

Affected Product Version:

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues:

move.after.process is failing

Description:
I want to move a file after being processed.
It fails and shows the following exception:
org.apache.commons.vfs2.FileSystemException: Could not rename "file:///home/wso2carbon/wso2sp-4.3.0/wso2/editor/deployment/input/input1.json" to "file:///home/wso2carbon/wso2sp-4.3.0/wso2/editor/deployment/processed/input1.json".

I have the following source definition in my rule:
@source(
type='file',
file.uri='file:///home/wso2carbon/wso2sp-4.3.0/wso2/editor/deployment/input/input1.json',
tailing='false',
action.after.process='move',
move.after.process='file:///home/wso2carbon/wso2sp-4.3.0/wso2/editor/deployment/processed/input1.json',
@Map(type='json'))

Suggested Labels:
file,move

Affected Product Version:
1.0.14

Steps to reproduce:
Deploy a rule with the source definition shown above.

Need the feature for TSV file read.

Description:
Currently, siddhi does not have support for the TSV file reading. Since some use cases need to read the TSV files using siddhi if it is better if we can add this feature for siddhi.

Suggested Labels:
streaming-integrator-1.1.0

Cannot use FTP or SFTP as dir.uri

Description:
Following error observed when an FTP URL is specified as the dir.uri

Error on 'SiddhiApp3'. Connection to the file directory is lost. Error while connecting at Source 'file' at 'SweetProductionStream'. Will retry in '15 sec'. io.siddhi.core.exception.ConnectionUnavailableException: Connection to the file directory is lost.
at io.siddhi.extension.io.file.FileSource.deployServers(FileSource.java:787)
at io.siddhi.extension.io.file.FileSource.connect(FileSource.java:572)
at io.siddhi.extension.io.file.FileSource.connect(FileSource.java:83)
at io.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:160)
at io.siddhi.core.stream.input.source.Source$1.run(Source.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.wso2.transport.remotefilesystem.exception.RemoteFileSystemConnectorException: Failed to initialize File server connector for Service: SiddhiApp3
at org.wso2.transport.remotefilesystem.server.connector.contractimpl.RemoteFileSystemServerConnectorImpl.<init>(RemoteFileSystemServerConnectorImpl.java:46)
at org.wso2.transport.remotefilesystem.impl.RemoteFileSystemConnectorFactoryImpl.createServerConnector(RemoteFileSystemConnectorFactoryImpl.java:40)
at io.siddhi.extension.io.file.FileSource.deployServers(FileSource.java:768)
... 11 more
Caused by: org.wso2.transport.remotefilesystem.exception.RemoteFileSystemConnectorException: [SiddhiApp3] File system server connector is used to listen to a folder. But the given path does not refer to a folder.
at org.wso2.transport.remotefilesystem.server.RemoteFileSystemConsumer.<init>(RemoteFileSystemConsumer.java:80)
at org.wso2.transport.remotefilesystem.server.connector.contractimpl.RemoteFileSystemServerConnectorImpl.<init>(RemoteFileSystemServerConnectorImpl.java:44)
... 13 more

Affected Product Version:
SI 1.1.0

OS, DB, other environment details and versions:

Steps to reproduce:
Use following siddhi app to reproduce:

@App:name("SiddhiApp3")

@App:description("Description of the plan")

@source(type='file', mode='line',
dir.uri='ftp://bob:12345@localhost:21/wrthgy',
file.system.options='PASSIVE_MODE:true',
action.after.process='keep',
tailing='false',
@map(type='csv'))
define stream SweetProductionStream (name string, amount long);

@sink(type='log')
define stream OutputStream (name string, amount long);

from SweetProductionStream
select *
insert into OutputStream;

Give file URI using HTTP

Description:
I was trying to have $subject use case in a Siddhi app as follows.

@source(type='file', file.uri='https://raw.githubusercontent.com/BuddhiWathsala/nats-demo-csv/master/stops.csv', @map(type="csv"))
define stream InputRouteStream1(routeNo string);

But it gives the following error log and it did not work as expected.

Exception in thread "Siddhi-1.LoadBusTimeTableSchedule-executor-thread-0" java.lang.NoClassDefFoundError: org/apache/commons/httpclient/util/URIUtil
	at org.apache.commons.vfs2.provider.URLFileName.getPathQueryEncoded(URLFileName.java:76)
	at org.apache.commons.vfs2.provider.URLFileName.getURIEncoded(URLFileName.java:133)
	at org.apache.commons.vfs2.provider.url.UrlFileObject.createURL(UrlFileObject.java:65)
	at org.apache.commons.vfs2.provider.url.UrlFileObject.doAttach(UrlFileObject.java:56)
	at org.apache.commons.vfs2.provider.AbstractFileObject.attach(AbstractFileObject.java:156)
	at org.apache.commons.vfs2.provider.AbstractFileObject.getContent(AbstractFileObject.java:1065)
	at org.wso2.transport.file.connector.server.FileConsumer.<init>(FileConsumer.java:96)
	at org.wso2.transport.file.connector.server.FileServerConnector.start(FileServerConnector.java:63)
	at io.siddhi.extension.io.file.FileSource.lambda$deployServers$6(FileSource.java:631)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.httpclient.util.URIUtil cannot be found by siddhi-io-file_2.0.5
	at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:448)
	at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:361)
	at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:353)
	at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:161)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 12 more

Any idea or suggestion on how to address this use case?

Suggested Labels:
Type improvement

Affected Product Version:
Siddhi tooling 5.1.2
Siddhi io file 2.0.5

NullPointer Exception when changing states (Run/ Stop) in Tooling

Description:
When using any file extension in the tooling and if we start and stop the siddhi application several times really fast, then it give a NullPointerException.

Steps to reproduce:
Deploy CSVDefaultMapping sample in the tooling.
Click on start and play button at a fast rate.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.