civictechto / ttc_subway_times Goto Github PK
View Code? Open in Web Editor NEWA scraper to grab and publish TTC subway arrival times.
License: GNU General Public License v3.0
A scraper to grab and publish TTC subway arrival times.
License: GNU General Public License v3.0
As another cost-saving measure, instead of having a dedicated EC2 instance running the data-scraping script, we could be hosting this on AWS Lambda, which would presumably be cheaper.
The Data Dictionary contains a table mapping between station_char
and station_id
, it doesn't contain the new Line 1 extension stations, we should pull that from... something.
Thanks to @perobertson's efforts to get continuous integration going we now have automated linting. However this has led to a lot of error messages with things failing. #42 silences many linting errors by adding them to the ignore
variable in setup.cfg
Start a PR and remove these lines one by one, committing with the linting error code each time. The CI will run and should explain which lines in the scraping code are causing these warnings to be raised. Then determine whether that style issue is worth fixing or not.
Now that testing and.... stuff has become a little bit more formalized, it would be helpful to new people to have instructions on how to submit Pull Requests. This would include the development tool set so they can familiarize themselves with what testing gets run and how when submitting PRs.
This should be in a CONTRIBUTING.md
file in the (to be created) .github/
folder
We've noticed that the API seems to lock us out if we use the async method of sending requests. This appears to send too many requests too quickly.
The current serverless version of the data pipeline is using serial requests to the API instead and that seems fine.
Dunno if there's a way to have a sleep timer on the async, which seems a liiiiittle counter-intuitive.
From the API we get (see Data Dictionary) station_id
and station_char
. For comparing with scheduled performance, and doing stuff like mapping station and line locations, it would be useful to have a mapping between GTFS stops and the data from the API.
GTFS can be download from here. Tools for dealing with gtfs data can be found here. I created two sql files to create the structure in sql and process the data:
Looks like the GTFS has at least one row per station-direction, here's a sample.
lineid | stop_id | stop_code | stop_name |
---|---|---|---|
1 | 14457 | 13863 | BLOOR STATION - NORTHBOUND PLATFORM |
1 | 14414 | 13864 | BLOOR STATION - SOUTHBOUND PLATFORM |
1 | 14455 | 13808 | COLLEGE STATION - NORTHBOUND PLATFORM |
1 | 14416 | 13807 | COLLEGE STATION - SOUTHBOUND PLATFORM |
1 | 14461 | 13797 | DAVISVILLE STATION - NORTHBOUND PLATFORM |
1 | 14410 | 13798 | DAVISVILLE STATION - SOUTHBOUND PLATFORM |
1 | 15698 | 15664 | DOWNSVIEW PARK STATION - NORTHBOUND PLATFORM |
1 | 15699 | 15665 | DOWNSVIEW PARK STATION - SOUTHBOUND PLATFORM |
1 | 14454 | 13809 | DUNDAS STATION - NORTHBOUND PLATFORM |
1 | 14417 | 13810 | DUNDAS STATION - SOUTHBOUND PLATFORM |
1 | 14428 | 13828 | DUPONT STATION - NORTHBOUND PLATFORM |
1 | 14443 | 13827 | DUPONT STATION - SOUTHBOUND PLATFORM |
1 | 14462 | 13796 | EGLINTON STATION - NORTHBOUND PLATFORM |
1 | 14409 | 13795 | EGLINTON STATION - SOUTHBOUND PLATFORM |
While the API is to some degree documented in a notebook under doc/
, an entity relation of the 3 tables would be helpful to newcomers + any other info you think is helpful to someone jumping into this project!
currently consolidate copies all data from one data into a tar. It should delete the folder it pulls from.
There is a possibility to have a bucket be publicly readable, but any person who downloads the data pays for the transfer costs.
A) I don't entirely understand how to set this up
B) It would be great if this could get folded into the serverless.yml
configuration/deployment.
Crashes not confirmed, but missing runs of data suggests that scraper is crashing.
More general exception-handling and refusal to exit on http error might do it.
Extracting 2019-10-29.tar.gz
98%|███████████████████████████████████▏| 40000/40983 [06:34<00:09, 100.08it/s]Traceback (most recent call last):
File "fetch_s3.py", line 212, in <module>
fetch_s3()
File "/home/rad/.local/share/virtualenvs/ttc_subway_times-ZmuzQ-JX/lib/python3.5/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/rad/.local/share/virtualenvs/ttc_subway_times-ZmuzQ-JX/lib/python3.5/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/rad/.local/share/virtualenvs/ttc_subway_times-ZmuzQ-JX/lib/python3.5/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/rad/.local/share/virtualenvs/ttc_subway_times-ZmuzQ-JX/lib/python3.5/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "fetch_s3.py", line 208, in fetch_s3
_fetch_s3(aws_access_key_id, aws_secret_access_key, output_dir, start_date, end_date, bucket)
File "fetch_s3.py", line 197, in _fetch_s3
fetch_and_transform(to_download, output_dir)
File "fetch_s3.py", line 80, in fetch_and_transform
jsons_to_csv(tmpdir, output_dir)
File "fetch_s3.py", line 117, in jsons_to_csv
pd.DataFrame.from_records(requests, columns=requests[0]._fields).to_csv(
IndexError: list index out of range
Would be nice to have a list of Subway & other delay events, with possible reasons. I can only find at the moment the TTC twitter account in terms of real-time feeds. So use the Twitter API?
https://dev.twitter.com/rest/reference/get/statuses/user_timeline
Having the scraper generate GTFS-RT data would have two benefits:
This requires generating GTFS-RT in real... time, and then also reprocessing the archive of data.
There are new stations! We should have their data.
Also, Line 1 stations were numbered 1-32, what are these new stations' ids?
Having burned through my 12 months of AWS Free tier, running this operation is currently ~$30/month because of the size of the RDS. We need a way to store backups of the database data in compressed monthly files that others can access. Preferably these would be hosted in S3 buckets on AWS. Until then prepare a command to archive a month of all three tables to csv
and compress them (and store them in SpiderOak)
Because of #54 the script takes... longer. This also means it costs more.
Any way to reduce that compute time without getting blocked would be great!
I've added a page for news articles relevant to the project, since some journalists did some analysis of the delay OpenData. If you find more feel free to share. It would also be good to find articles about delays that were epic enough to get written about, so we can see what those service failures look like in the data.
There also a Resources page where I've linked to some transit resources for tech tools, local projects, and research.
This may just be my Pi struggling a little, but according to the 2 hr data sample the average request (and bear in mind that create_date
is truncated to the second), completion time of each station request is, on average 2.6 secs with a median of 2s (truncated). There are 68 stations, so each run is taking longer than the run frequency (I think).
Could investigate using a different requests library to try to have non-blocking IO. example
Or, bundle inserts, which should reduce lost time from creating connections to the database.
logfile got really biiig and seems to have frozen the EC2 😕
After #55, it would be useful to consolidate past dates. Consolidate should therefore take a date as a parameter.
We can also get Delay Data from OpenData, which is logged by operators. You can find the data here (this link is going to break really shortly).
trainids appear to be max 3 digits whereas subway vehicle numbers appear to be 4 digits starting with 5
Do these two datasets correspond? Do the IDs match and can we gain insight on the delay duration from the real-time data.
Try a second or so later and see if any data is returned.
Comparing 2019-03-25 from the old python scraper with the same date from the serverless one.
I had to convert the folder of json into a csv using the jsons_to_csv()
function from fetch_s3.py
WITH new_data AS (SELECT date_trunc('hour', create_date) request_ts, COUNT(id) AS new_ids
FROM requests_serverless
RIGHT OUTER JOIN ntas_data_serverless serverless ON serverless.requestid = requests_serverless.requestid
WHERE create_date >= '2019-03-25 ' AND
create_date < '2019-03-26'
GROUP BY request_ts),
old_data AS (SELECT date_trunc('hour', requests.create_date) request_ts, COUNT(id) AS old_ids
FROM requests
RIGHT OUTER JOIN ntas_data USING(requestid)
WHERE requests.create_date >= '2019-03-25' AND
requests.create_date < '2019-03-26'
GROUP BY request_ts)
SELECT request_ts, old_ids, new_ids
FROM new_data
RIGHT OUTER JOIN old_data USING(request_ts)
ORDER BY request_ts
'request_ts';'old_ids';'new_ids'
'2019-03-25 00:00:00';23,985;
'2019-03-25 01:00:00';21,499;
'2019-03-25 04:00:00';2;
'2019-03-25 05:00:00';10,433;6,212
'2019-03-25 06:00:00';24,546;13,126
'2019-03-25 07:00:00';25,393;13,575
'2019-03-25 08:00:00';25,444;13,509
'2019-03-25 09:00:00';24,943;13,395
'2019-03-25 10:00:00';25,279;13,582
'2019-03-25 11:00:00';25,342;13,489
'2019-03-25 12:00:00';24,901;13,393
'2019-03-25 13:00:00';25,218;13,415
'2019-03-25 14:00:00';24,899;13,355
'2019-03-25 15:00:00';25,107;13,355
'2019-03-25 16:00:00';25,061;13,488
'2019-03-25 17:00:00';25,301;13,551
'2019-03-25 18:00:00';25,309;13,520
'2019-03-25 19:00:00';24,948;13,534
'2019-03-25 20:00:00';25,401;13,605
'2019-03-25 21:00:00';24,932;13,438
'2019-03-25 22:00:00';24,393;13,051
'2019-03-25 23:00:00';23,879;12,854
It would be helpful in algorithm design if the scraper recorded with each train observation for a given request and direction its position - 1st, 2nd, or 3rd.
This would be stored in the ntas_data table.
Start by selecting all the samples for a train ordered by timestamp.
Merge all samples for a train in the same sample period to get a good estimate of the location.
Use those location estimates to generate arrival time estimates.
A lot of our data exploration and documentation is in Jupyter Notebooks, this is not an obvious format for new users.
I have data since March 19, 2019 from running the AWS scraper
To easily configure different logging level from debug.
Noted by @samkodes Station 68 appears to be missing in the dataset
The scraper isn't necessarily the fastest on the Pi. And there is certainly a limit to the amount of data that can be stored on the SD card.
If I get a few hours I could set up my old desktop as a server. Otherwise open to suggestions for cheap hosting options.
Periodically we get the below errors (the number in (275) is the line number in ttc_api_scraper.py
printing that log message).
2018-11-20 20:08:09,876 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:09,876 (276): <generator object ClientResponse.text at 0x7f851c4ecba0>
2018-11-20 20:08:09,886 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:09,886 (274): Malformed JSON for station 52 on line 2
2018-11-20 20:08:09,886 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:09,876 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:09,876 (276): <generator object ClientResponse.text at 0x7f851c4ecba0>
2018-11-20 20:08:09,886 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:09,886 (274): Malformed JSON for station 52 on line 2
2018-11-20 20:08:09,886 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:09,886 (276): <generator object ClientResponse.text at 0x7f851c4d7f10>
2018-11-20 20:08:09,967 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:09,968 (274): Malformed JSON for station 59 on line 2
2018-11-20 20:08:09,968 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:09,968 (276): <generator object ClientResponse.text at 0x7f851c4e72b0>
2018-11-20 20:08:10,034 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:10,034 (274): Malformed JSON for station 66 on line 4
2018-11-20 20:08:10,034 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:10,034 (276): <generator object ClientResponse.text at 0x7f851c4ec620>
2018-11-20 20:08:10,059 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:10,059 (274): Malformed JSON for station 65 on line 4
2018-11-20 20:08:10,059 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:10,059 (276): <generator object ClientResponse.text at 0x7f851c4ec360>
2018-11-20 20:08:10,075 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:10,076 (274): Malformed JSON for station 60 on line 2
2018-11-20 20:08:10,076 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:10,076 (276): <generator object ClientResponse.text at 0x7f851c4e7518>
2018-11-20 20:08:10,098 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:10,098 (274): Malformed JSON for station 58 on line 2
2018-11-20 20:08:10,098 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:10,098 (276): <generator object ClientResponse.text at 0x7f851c4dffc0>
2018-11-20 20:08:10,099 (775): Attempt to decode JSON with unexpected mimetype: text/html
2018-11-20 20:08:10,099 (274): Malformed JSON for station 7 on line 1
2018-11-20 20:08:10,100 (275): Expecting value: line 1 column 1 (char 0)
2018-11-20 20:08:10,100 (276): <generator object ClientResponse.text at 0x7f851c4ecf10>
But the scraper then sleeps for 2s for that station, and tries 3 more times, and typically succeeds.
I edited the db.cfg file as per: https://github.com/CivicTechTO/ttc_subway_times/blob/master/db.cfg
But getting the following error:
__main__ - CRITICAL - FATAL: role "rad" does not exist'
Should I be using different credentials?
It would be helpful in algorithm design to be able to treat each cycle through the stations as a single "poll" of the entire system.
I propose assigning each poll a unique sequential number, similar to requestid.
This number could be stored in the requests table and be used to pull together all requests from a single poll.
Hi Raphael – I have a few brief thoughts on beginning to process the NTAS data that I thought I’d share in case they’re helpful. I’d be happy to start playing with implementing this processing once some data is collected (even a day’s worth).
There are two general approaches that I think could be fruitfully combined. The first approach tries to reconstruct the system’s prediction model by extracting predicted travel times between stations and looking for typical and exceptional patterns (if the system is really dumb, there will be no exceptional patterns and all we’ll get is constant travel times between stations; if the system is smart, we’ll get more information – see below). The second approach tracks variation in predicted times for each train as it moves through the system.
Both approaches assume a database that stores a single train prediction per record, with some pre-processing done to create a field called PAT (predicted arrival time) – just createDate + timeInt. So a record would have stationId, trainId, PAT, createDate, etc. I’m assuming a trainId refers to a “run” of the train, as Sammi deGuzman’s blog suggests. If the same trainId appears on multiple runs, some time-based filtering will have to happen below to make sure we’re picking up only a single run of a train.
Suppose we have two records with the same trainId and different stationId’s. Then subtracting PATs gives us a travel time estimate (TTE) between those stations (technically, it also includes load/unload times).
If the system is stupid, TTEs between any pair of stations will be constant. This means that there’s a very high degree of redundancy in the NTAS data and there’s no reason to save observations of the same train from multiple stations for future analysis (or alternatively, observations of the same train from multiple stations at different times can be combined very easily).
If the system is smart, TTEs could vary for a number of reasons:
- High passenger volume periods increase load/unload time
- High train volume means slower train speeds
- Traffic adjustment might means slower train speeds
- Delays (for any reason) might demand traffic adjustment – meaning slower train speeds
Simply making a histogram of TTEs for any pair of stations should tell us whether the system is smart or not and what kinds of variations it might be picking up. If the system is smart, looking at unusual TTEs and seeing how they move around between stations might give us insight into how local delays propagate through the prediction model.
If building a table of TTEs, it’s probably a good idea to record the data the TTEs came from – i.e. the two original records that generate each TTE. The table should also contain a creationDate, though it’s not clear what that date should be if the records used to create the TTE have different times (they certainly will, since we’re doing low-frequency sampling). So record both creationDates?
Some filtering will be required when creating TTEs to use only records sampled close together in time (say, choose the closest times possible, and enforce some maximum time difference); this avoids junk estimates produced if traffic conditions change between the sampling of the two original records.
Suppose we have multiple records with the same trainId and stationId. Order them by creationDate and subtract the first PAT from all the others (alternatively, could calculate running differences); augment each record by putting this difference in a field called “localDelay”. This seems good enough to start identifying problems. Comparing local delays across stations will also help describe how they propagate through the prediction model.
PostgreSQL function that gets called every scraper run.
Following the gtfs spec
{trip_id, arrival_time, departure_time, stop_id, stop_sequence}
Note: The times would actually be timestamps.
Which brings up:
The pycares
dependency doesn't seem to work in 3.6. Should include details for setting up a virtualenv in the README.
The API appears to now have a train direction field. We should add it to the scraper and the database
[ERROR] ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied
Traceback (most recent call last):
File "/var/task/src/ttc_api_scraper/consolidate.py", line 110, in handler
consolidate()
File "/var/task/src/ttc_api_scraper/consolidate.py", line 52, in consolidate
download_dir(client, s3_bucket, "{consoli_date}/".format(consoli_date=consoli_date), scrape_path)
File "/var/task/src/ttc_api_scraper/consolidate.py", line 93, in download_dir
for result in paginator.paginate(Bucket=bucket, Prefix=path):
File "/var/runtime/botocore/paginate.py", line 255, in __iter__
response = self._make_request(current_kwargs)
File "/var/runtime/botocore/paginate.py", line 332, in _make_request
return self._method(**current_kwargs)
File "/var/runtime/botocore/client.py", line 320, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/var/runtime/botocore/client.py", line 623, in _make_api_call
raise error_class(parsed_response, operation_name)
I think it might be something with the IAM user I configured, I got this from.... somewhere.
{
"Statement": [
{
"Action": [
"apigateway:*",
"cloudformation:CancelUpdateStack",
"cloudformation:ContinueUpdateRollback",
"cloudformation:CreateChangeSet",
"cloudformation:CreateStack",
"cloudformation:CreateUploadBucket",
"cloudformation:DeleteStack",
"cloudformation:Describe*",
"cloudformation:EstimateTemplateCost",
"cloudformation:ExecuteChangeSet",
"cloudformation:Get*",
"cloudformation:List*",
"cloudformation:PreviewStackUpdate",
"cloudformation:UpdateStack",
"cloudformation:UpdateTerminationProtection",
"cloudformation:ValidateTemplate",
"dynamodb:CreateTable",
"dynamodb:DeleteTable",
"dynamodb:DescribeTable",
"ec2:AttachInternetGateway",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreateInternetGateway",
"ec2:CreateNetworkAcl",
"ec2:CreateNetworkAclEntry",
"ec2:CreateRouteTable",
"ec2:CreateSecurityGroup",
"ec2:CreateSubnet",
"ec2:CreateTags",
"ec2:CreateVpc",
"ec2:DeleteInternetGateway",
"ec2:DeleteNetworkAcl",
"ec2:DeleteNetworkAclEntry",
"ec2:DeleteRouteTable",
"ec2:DeleteSecurityGroup",
"ec2:DeleteSubnet",
"ec2:DeleteVpc",
"ec2:Describe*",
"ec2:DetachInternetGateway",
"ec2:ModifyVpcAttribute",
"events:DeleteRule",
"events:DescribeRule",
"events:ListRuleNamesByTarget",
"events:ListRules",
"events:ListTargetsByRule",
"events:PutRule",
"events:PutTargets",
"events:RemoveTargets",
"iam:CreateRole",
"iam:DeleteRole",
"iam:DeleteRolePolicy",
"iam:GetRole",
"iam:PassRole",
"iam:PutRolePolicy",
"iot:CreateTopicRule",
"iot:DeleteTopicRule",
"iot:DisableTopicRule",
"iot:EnableTopicRule",
"iot:ReplaceTopicRule",
"kinesis:CreateStream",
"kinesis:DeleteStream",
"kinesis:DescribeStream",
"lambda:*",
"logs:CreateLogGroup",
"logs:DeleteLogGroup",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:FilterLogEvents",
"logs:GetLogEvents",
"s3:CreateBucket",
"s3:DeleteBucket",
"s3:DeleteBucketPolicy",
"s3:DeleteObject",
"s3:DeleteObjectVersion",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:ListAllMyBuckets",
"s3:ListBucket",
"s3:PutBucketNotification",
"s3:PutBucketPolicy",
"s3:PutBucketTagging",
"s3:PutBucketWebsite",
"s3:PutEncryptionConfiguration",
"s3:PutObject",
"sns:CreateTopic",
"sns:DeleteTopic",
"sns:GetSubscriptionAttributes",
"sns:GetTopicAttributes",
"sns:ListSubscriptions",
"sns:ListSubscriptionsByTopic",
"sns:ListTopics",
"sns:SetSubscriptionAttributes",
"sns:SetTopicAttributes",
"sns:Subscribe",
"sns:Unsubscribe",
"states:CreateStateMachine",
"states:DeleteStateMachine"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
}
Create a user survey
Proposed questions:(early draft)
How often do you use the subway?
Commute +, Commute, Weekly, Monthly, Less frequently
Which lines do you use?
Yonge-University, Bloor, Sheppard, Scarborough
How frequently do you experience delays?
more than 1/5, 1/10, 1/50, 1/100
How much of a problem are delays?
None, Minor, Moderate, Major, Severe
Would you like to know what the history of delays is so that you can improve your trip planning?
Not at all, Somewhat, Very much, Absolutely
Are the current methods of announcing delays adequate?
Not at all, Mostly, Absolutely
Running python3 fetch_s3.py --bucket ttc.scrape --start_date 2019-04-01 --end_date 2019-05-01 --output_dir some_local_dir
from the README throws a ClientError
Full error message:
botocore.exceptions.ClientError: An error occurred (ExpiredToken) when calling the ListObjectsV2 operation: The provided token has expired.
Attempted with different date ranges and network and ran into the same issue. I ended up getting data from the links in the channel instead so it's all good.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.