tomgrek / mlq Goto Github PK
View Code? Open in Web Editor NEWAsynchronous queue for machine learning jobs
License: MIT License
Asynchronous queue for machine learning jobs
License: MIT License
Whenever I get message in MLQ worker, I get the following exception:
ERROR:concurrent.futures:exception calling callback for <Future at 0x2ee0d160048 state=finished raised TypeError>
Traceback (most recent call last):
File "C:\Python37\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "MyProjectPath\venv\lib\site-packages\mlq\queue.py", line 225, in reaper
progress_key = self.job_status_stem + job_key
TypeError: can only concatenate str (not "bytes") to str
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Python37\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
callback(self)
File "C:\Python37\lib\asyncio\futures.py", line 362, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "C:\Python37\lib\asyncio\base_events.py", line 728, in call_soon_threadsafe
self._check_closed()
File "C:\Python37\lib\asyncio\base_events.py", line 475, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Am I doing smth wrong?
//////////////////////////////////////////////////////////////////////////
Server's code is the following:
//////////////////////////////////////////////////////////////////////////
from flask import Flask, request, jsonify
import requests
from werkzeug.datastructures import ImmutableMultiDict
import csv
from mlq.queue import MLQ
import fastjsonschema
app = Flask(__name__)
# Create MLQ: namespace, redis host, redis port, redis db
mlq = MLQ('ML_app', 'localhost', 6379, 0)
CALLBACK_URL = 'http://localhost:5000/callback'
test_schema = fastjsonschema.compile({
'type': 'object',
'properties': {
'text': {'type': 'string'},
'save_model': {'type': 'boolean', 'default': True}
},
})
@app.route('/test', methods=['POST'])
def test():
test_schema(request.json)
job_id = mlq.post(request.json)
print(request.json)
return jsonify({'msg': 'Processing, check back soon.', 'job_id': job_id})
@app.route('/train', methods=['POST'])
def train():
data = dict(request.form)
print(data)
assert csv.Sniffer().sniff(data)
job_id = mlq.post(data)
return jsonify({'msg': 'Processing, check back soon.', 'job_id': job_id})
@app.route('/task/progress/<job_id>', methods=['GET'])
def get_progress(job_id):
return jsonify(mlq.get_progress(job_id))
@app.route('/task/result/<job_id>', methods=['GET'])
def get_result(job_id):
return jsonify(mlq.get_job(job_id, ))
# @app.route('/callback', methods=['GET'])
# def train_model():
# success = request.args.get('success', None)
# job_id = request.args.get('job_id', None)
# short_result = request.args.get('short_result', None)
# print("We received a callback! Job ID {} returned successful={} with short_result {}".format(
# job_id, success, short_result
# ))
# return 'ok'
@app.route('/', methods=['GET'])
def root():
return 'Welcome to NLP model training API'
app.run(host='0.0.0.0', port=5000)
//////////////////////////////////////////////////////////////////////////
The code of the worker:
//////////////////////////////////////////////////////////////////////////
import asyncio
import time
from Trainer import NERTrainer
from mlq.queue import MLQ
import sys, traceback, json
mlq = MLQ('ML_app', 'localhost', 6379, 0)
trainer = NERTrainer(mlq)
def listener_func(msg, *args):
global trainer
text = msg['text']
print("I got msg: " + text)
do_save = msg['save_model']
try:
result = trainer.test(text)
if do_save:
time.sleep(10)
print('Saving model')
return result
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
return traceback.format_exc().splitlines()
async def main():
print("Running, waiting for messages.")
mlq.create_reaper(call_how_often=60, job_timeout=300, max_retries=5)
mlq.create_listener(listener_func)
if __name__ == '__main__':
asyncio.run(main())
//////////////////////////////////////////////////////////////////////////
The code of the trainer:
//////////////////////////////////////////////////////////////////////////
import logging
from mlq.queue import MLQ
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
class NERTrainer:
def __init__(self, mlq_instance, predictor=None):
from allennlp.predictors.predictor import Predictor
self.mlq_instance = mlq_instance
self.predictor = self.mlq_instance._redis.get('allennlp_model_trained') or False
if not predictor:
self.predictor = Predictor.from_path("fine-grained-ner-model-elmo-2018.12.21.tar.gz")
def train(self, val):
self.predictor.train(val)
def test(self, val):
result = self.predictor.predict(sentence=val)
d = dict(zip(result['tags'], result['words']))
del d['O']
return d
if __name__ == "__main__":
mlq = MLQ('ML_app', 'localhost', 6379, 0)
t = NERTrainer(mlq)
No release of last 2 years. Do you have any recent updated new repos or are you planing to left the project ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.