Code Monkey home page Code Monkey logo

ml-system's Introduction

Hi, Iā€™m @penhsuanwang

Researching on streaming data techniques and relative ML application.
Designed architecture based on Kafka & Spark platform. Iā€™m interested in state-of-the-art ML model for incremental learning.

ml-system's People

Contributors

penhsuanwang avatar

Stargazers

 avatar

Watchers

 avatar

ml-system's Issues

Implement Functionality to List All Models in MLFlow Model Store for Web API Integration

Description

Our application currently lacks the capability to list all models stored in the MLFlow model store directly through a web API. This feature is essential for enhancing our web UI's ability to display a comprehensive list of all model names along with their associated tags. Implementing this functionality will allow users to easily access and manage models through the web interface, promoting better interaction and accessibility.

Proposed Change

Implement a new method in the MLFlowClientModelAgent class that utilizes MlflowClient from the MLflow tracking API to list all registered models. This method will support our new web API endpoint that provides a list of all models, which can then be used to populate the model management section of our web UI.

Implementation Details

MLFlowClientModelAgent Class:

from mlflow.tracking import MlflowClient

class MLFlowClientModelAgent(MLFlowClient):
    @classmethod
    def list_all_models(cls):
        """
        List all models registered in the MLflow model store.
        This method will be used to provide data for a web API endpoint.
        :return: A list of registered models with their names and tags.
        """
        client = MlflowClient()
        models = client.list_registered_models()
        model_details = [{'name': model.name, 'tags': model.tags} for model in models]
        return model_details

Benefits

  • Enhanced Visibility: Provides the web UI with the ability to display all available models along with their details.
  • Improved Usability: Simplifies user interactions by allowing them to view and select models directly from the web UI.
  • API Integration: Supports integration with web APIs, facilitating better data management and accessibility.

Add update method for model, trainer, data_processor

The enhancement allows users to manage the machine learning system more effectively by providing REST API endpoints to:

  • List existing models, trainers, and data processors.
  • Update the configurations of these components.

Components Involved

  1. Model Store (src/store/model_store.py)
  2. Data Processor Store (src/store/data_processor_store.py)
  3. Trainer Store (src/store/trainer_store.py)
  4. ML Training Serving Application (src/webapp/ml_training_serving_app.py)
  5. API Router (src/webapp/ml_training_serving_app_router.py)

Detailed Design

Store Classes
Model Store
  • Methods Added:
    • update_model(cls, model_id: str, new_model: object) -> bool
Data Processor Store
  • Methods Added:
    • update_data_processor(cls, data_processor_id: str, new_data_processor: object) -> bool
Trainer Store
  • Methods Added:
    • update_trainer(cls, trainer_id: str, new_trainer: object) -> bool
ML Training Serving Application
Class: MLTrainingServingApp

New Methods:

  • list_models(cls) -> list
  • list_trainers(cls) -> list
  • list_data_processors(cls) -> list
  • update_model(cls, model_id: str, model_params: dict) -> bool
  • update_trainer(cls, trainer_id: str, trainer_params: dict) -> bool
  • update_data_processor(cls, data_processor_id: str, data_processor_params: dict) -> bool
API Endpoints
File: src/webapp/ml_training_serving_app_router.py

New Endpoints:

  • List Models:

    @router.get("/ml_training_manager/list_models")
    def list_models(ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation
  • List Trainers:

    @router.get("/ml_training_manager/list_trainers")
    def list_trainers(ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation
  • List Data Processors:

    @router.get("/ml_training_manager/list_data_processors")
    def list_data_processors(ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation
  • Update Model:

    @router.put("/ml_training_manager/update_model/{model_id}")
    def update_model(model_id: str, update_params: UpdateModelParams, ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation
  • Update Trainer:

    @router.put("/ml_training_manager/update_trainer/{trainer_id}")
    def update_trainer(trainer_id: str, update_params: UpdateTrainerParams, ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation
  • Update Data Processor:

    @router.put("/ml_training_manager/update_data_processor/{data_processor_id}")
    def update_data_processor(data_processor_id: str, update_params: UpdateDataProcessorParams, ml_trainer_app: MLTrainingServingApp = Depends(get_app)):
        # Implementation

Data Flow

  1. List Operations:

    • Client sends GET request to list endpoint.
    • Server responds with the list of models/trainers/data processors.
  2. Update Operations:

    • Client sends PUT request to update endpoint with new configuration.
    • Server validates the request and updates the configuration if the component exists.

Sequence Diagrams

List Models, Trainers, and Data Processors
Client -> Server: GET /ml_training_manager/list_models
Server -> ModelStore: list_models()
ModelStore -> Server: Return list of models
Server -> Client: Return list of models

Client -> Server: GET /ml_training_manager/list_trainers
Server -> TrainerStore: list_trainers()
TrainerStore -> Server: Return list of trainers
Server -> Client: Return list of trainers

Client -> Server: GET /ml_training_manager/list_data_processors
Server -> DataProcessorStore: list_data_processors()
DataProcessorStore -> Server: Return list of data processors
Server -> Client: Return list of data processors
Update Models, Trainers, and Data Processors
Client -> Server: PUT /ml_training_manager/update_model/{model_id}
Server -> ModelStore: get_model(model_id)
ModelStore -> Server: Return model
Server -> Model: Update model parameters
Server -> ModelStore: update_model(model_id, updated_model)
ModelStore -> Server: Return update status
Server -> Client: Return update status

Client -> Server: PUT /ml_training_manager/update_trainer/{trainer_id}
Server -> TrainerStore: get_trainer(trainer_id)
TrainerStore -> Server: Return trainer
Server -> Trainer: Update trainer parameters
Server -> TrainerStore: update_trainer(trainer_id, updated_trainer)
TrainerStore -> Server: Return update status
Server -> Client: Return update status

Client -> Server: PUT /ml_training_manager/update_data_processor/{data_processor_id}
Server -> DataProcessorStore: get_data_processor(data_processor_id)
DataProcessorStore -> Server: Return data processor
Server -> DataProcessor: Update data processor parameters
Server -> DataProcessorStore: update_data_processor(data_processor_id, updated_data_processor)
DataProcessorStore -> Server: Return update status
Server -> Client: Return update status

Add Support for Initializing Data Processor with Custom DataFrame

We need to enhance our ML training application by adding support for initializing the data processor with a custom DataFrame directly passed by the user. This approach maintains backward compatibility with the existing data fetcher functionality while providing additional flexibility for users to directly supply their training data.

Motivation

  1. User Flexibility: Allowing users to pass their own DataFrame directly simplifies the process for those who already have preprocessed data or prefer to handle data fetching and preparation themselves.
  2. Efficiency: Bypassing the data fetcher can save time, especially for users with large datasets who want to avoid the overhead of fetching data multiple times.
  3. Simplified Workflow: Users can directly integrate their data into the training pipeline without the need for additional fetcher configuration or dependencies.

Requirements

  1. Update init_data_processor Method:

    • Modify the init_data_processor method in MLTrainingServingApp to accept an optional DataFrame parameter.
    • If a DataFrame is provided, it will be used instead of fetching data from the data fetcher.
  2. Minimal Impact on Existing Functionality:

    • Ensure the existing data fetcher functionality remains intact.
    • Allow users to initialize the data processor with either a fetched DataFrame or a custom DataFrame.

Example Code Snippets

Before Refactor:

class MLTrainingServingApp:
    # Other methods...

    @classmethod
    def init_data_processor(cls, data_processor_type: str, **kwargs) -> bool:
        """
        Design for an exposed REST api to let client init the data preprocessor
        To initialize the data preprocessor
        Initialize parameters from kwargs provided by client via REST api request body.
        :param data_processor_type: The type of data processor to initialize
        :param kwargs: Additional parameters for the data processor
        :return: True if data processor is successfully initialized
        """

        if cls._data_fetcher is None:
            print("Data fetcher is not initialized")
            return False
        if not cls._raw_pandas_dataframe:
            try:
                cls._raw_pandas_dataframe = cls._data_fetcher.get_as_dataframe()
            except ValueError as ve:
                print("Failed to get data from data fetcher, try fetcher again")
                cls._raw_pandas_dataframe = cls._data_fetcher.fetch_from_source()
                cls._raw_pandas_dataframe = cls._data_fetcher.get_as_dataframe()

        try:
            cls._data_processor = DataProcessorFactory.create_data_processor(
                data_processor_type,
                input_data=cls._raw_pandas_dataframe,
                **kwargs
            )
            cls._data_processor_store.add_data_processor(
                data_processor_id="pytorch_lstm_aapl",
                data_processor=cls._data_processor
            )
        except Exception as e:
            print("Failed to init data processor")
            return False

        return True

After Refactor:

class MLTrainingServingApp:
    # Other methods...

    @classmethod
    def init_data_processor(cls, data_processor_type: str, dataframe: pd.DataFrame = None, **kwargs) -> bool:
        """
        Design for an exposed REST api to let client init the data preprocessor
        To initialize the data preprocessor
        Initialize parameters from kwargs provided by client via REST api request body.
        If a DataFrame is provided, it will be used instead of fetching data from the data fetcher.
        :param data_processor_type: The type of data processor to initialize
        :param dataframe: Optional pandas DataFrame containing the data to process
        :param kwargs: Additional parameters for the data processor
        :return: True if data processor is successfully initialized
        """

        if dataframe is not None:
            # Use the provided DataFrame
            cls._raw_pandas_dataframe = dataframe
        else:
            # Check if the data fetcher is initialized
            if cls._data_fetcher is None:
                print("Data fetcher is not initialized")
                return False
            if not cls._raw_pandas_dataframe:
                try:
                    cls._raw_pandas_dataframe = cls._data_fetcher.get_as_dataframe()
                except ValueError as ve:
                    print("Failed to get data from data fetcher, try fetcher again")
                    cls._raw_pandas_dataframe = cls._data_fetcher.fetch_from_source()
                    cls._raw_pandas_dataframe = cls._data_fetcher.get_as_dataframe()

        try:
            cls._data_processor = DataProcessorFactory.create_data_processor(
                data_processor_type,
                input_data=cls._raw_pandas_dataframe,
                **kwargs
            )
            # Register the data processor to data processor manager
            cls._data_processor_store.add_data_processor(
                data_processor_id="pytorch_lstm_aapl",
                data_processor=cls._data_processor
            )
        except Exception as e:
            print("Failed to init data processor")
            print(e)
            return False

        return True

Time Series Data Process need to do window size check

The violation of the provided window size is occurring.
Need to check the minimum data size and the corresponding window size.

    def _sliding_window_mask(self, input_array: np.ndarray) -> tuple[list[np.ndarray], list[np.ndarray]]:
        """
        Sliding window mask on provided data, return training data and target data,
        """
        x = []
        y = []

        for i in range(self._training_window_size, len(input_array) - self._target_window_size + 1):
            x.append(input_array[i - self._training_window_size:i])
            y.append(input_array[i:i + self._target_window_size, 0])

Design of MLFlowClientModelLoader could cause MRO conflict

The design of MLFlowClientModelLoader, which is inherited from the base classes MLFlowClient and MLFlowClientModelAgent, could potentially cause a Method Resolution Order conflict when it needs to be integrated into mlflow_agent in the future.

class MLFlowAgent(
    MLFlowTracking,
    MLFlowConfiguration,
    MLFlowModelRegistry,
    MLFlowClientModelLoader,
    metaclass=SingletonMeta
):

need to fix in the future

Enhance MLTrainingServingApp for Multi-Component Management and Historical Tracking

We are updating the MLTrainingServingApp to include functionality for managing trainers and models using separate singleton stores. This enhancement adheres to the Single Responsibility Principle and ensures a cleaner, more modular codebase. Additionally, we are adding new API endpoints to fetch existing trainers and models.

Motivation

  1. Single Responsibility Principle: By separating the storage of trainers and models, we ensure that each class has a single responsibility, making the codebase easier to maintain and extend.
  2. Modularity: This change enhances the modularity of the system, making it simpler to manage and interact with different components.
  3. Scalability: Separate stores for trainers and models provide a scalable way to manage multiple instances, facilitating easier extension and integration of new features in the future.

Changes Overview

1. Introduced Separate Stores:

  • TrainerStore: Manages trainer instances.
  • ModelStore: Manages model instances.

2. Updated MLTrainingServingApp:

  • Added methods to initialize and fetch trainers and models from the respective stores.
  • Modified methods to interact with the new TrainerStore and ModelStore.

3. Added New API Endpoints:

  • Fetch existing trainers and models by their IDs.

Code Architecture

  1. Singleton Stores:

    • trainer_store.py:
      class TrainerStore:
          # Singleton implementation
          def add_trainer(cls, trainer_id: str, trainer: object) -> bool:
              # Add trainer to store
          def get_trainer(cls, trainer_id: str) -> object:
              # Get trainer from store
    • model_store.py:
      class ModelStore:
          # Singleton implementation
          def add_model(cls, model_id: str, model: object) -> bool:
              # Add model to store
          def get_model(cls, model_id: str) -> object:
              # Get model from store
  2. MLTrainingServingApp Enhancements:

    • Interactions with TrainerStore and ModelStore:
      class MLTrainingServingApp:
          def init_model(cls, model_type: str, model_id: str, **kwargs) -> bool:
              # Initialize and store model
          def init_trainer(cls, trainer_type: str, trainer_id: str, **kwargs) -> bool:
              # Initialize and store trainer
          def get_model(cls, model_id: str):
              # Fetch model from store
          def get_trainer(cls, trainer_id: str):
              # Fetch trainer from store
  3. New API Endpoints:

    • Fetch existing trainers and models:
      @router.get("/ml_training_manager/get_trainer/{trainer_id}")
      def get_trainer(trainer_id: str):
          # Fetch trainer by ID
      
      @router.get("/ml_training_manager/get_model/{model_id}")
      def get_model(model_id: str):
          # Fetch model by ID

Implementation Steps

  1. Create TrainerStore and ModelStore:

    • Implement singleton pattern.
    • Methods for adding, retrieving, and removing trainers and models.
  2. Update MLTrainingServingApp:

    • Add interactions with TrainerStore and ModelStore.
    • Ensure existing functionalities remain unchanged.
  3. Add New API Endpoints:

    • Implement endpoints to fetch trainers and models by ID.

ml_training_serving_app setup MLFlow Client setting in the function

The web application implementation, ml_training_serving_app, hardcodes the MLFlow Client settings in the function.

It has to be refactored to allow MLFlow Client settings to be passed from the client via an API.

   @classmethod
    def init_trainer(cls, trainer_type: str, **kwargs) -> bool:
        """
        Design for an exposed REST api to let client init the trainer
        To initialize the trainer
        Initialize parameters from kwargs provided by client via REST api request body.
        The trainer provide the function to add mlflow agent to log the training process and register the model
        create mlflow agent and setting the tracking uri here.
        :param trainer_type:
        :param kwargs:
        :return:
        """

        criterion = None
        if kwargs["loss_function"] == "mse":
            criterion = torch.nn.MSELoss()

        optimizer = None
        if kwargs["optimizer"] == "adam":
            optimizer = torch.optim.Adam(cls._model.parameters(), lr=float(kwargs["learning_rate"]))

        mlflow_agent = MLFlowAgent()

>        os.environ['MLFLOW_TRACKING_USERNAME'] = 'mlflow_pwang'
>        os.environ['MLFLOW_TRACKING_PASSWORD'] = 'mlflow_pwang'
>        mlflow_agent.set_tracking_uri("http://localhost:5011")

Implement MLTrainingPipeline for Streamlined ML Workflows

Description

To enhance our FastAPI backend for machine learning, aim to implement a centralized object, MLTrainingPipeline, for managing the complete ML training process. This pipeline will be responsible for data fetching, processing, model training, and evaluation.

Objectives

  • Develop the MLTrainingPipeline and MLTrainingPipelineBuilder classes.
  • Integrate these classes into the FastAPI backend.
  • Create an API endpoint to allow users to configure and initiate the ML training process.

Tasks

  • Implement the MLTrainingPipeline class with methods for data preparation, training, and evaluation.
  • Develop the MLTrainingPipelineBuilder class for flexible pipeline construction.
  • Create a FastAPI endpoint /create_training_pipeline to handle the construction and execution of the training pipeline.
  • Ensure thorough testing of the pipeline components and the endpoint.
  • Document the usage and configurations for the API endpoint.

Future Scope

  • Consider extending the pipeline's capabilities for advanced features like hyperparameter tuning.
  • Evaluate integration with MLFlow for experiment tracking.

To make data processor store and share across different ml training job

TTo add a new module to store data processed in a singleton pattern.
Because the model inference stage still needs the data processor to invert the output value scale.

Since the data processor is currently implemented under MlTrainingServerApp:

class MLTrainingServingApp:
    """
    A singleton class to serve the model training process
    provided the operation interface to client via REST api
    1. operating the data fetcher by data_io_serving_app, another exported REST api to user to init the data fetcher.
       provide the request body with information about fetch_data parameter
    """

    _data_io_serving_app = src.webapp.data_io_serving_app.get_app()

    # internal module tools for ml training job
    _data_fetcher = None
>    _data_processor = None
    _trainer = None

This causes the data processor to be pruned once the training loop is finished.
We should build a data processor storage class in a singleton pattern so that the data processor can be stored and shared across different tasks.

Implement API Endpoint for Triggering ML Training Pipeline

Issue Description:
We need to implement a new API endpoint to allow external systems to trigger our ML training pipeline by directly sending training data. The endpoint must be secure, validate the incoming data, and initiate the training process asynchronously.

Tasks:

  • Design and implement the TrainingDataRequest Pydantic model for data validation.
  • Develop the /ml_training/trigger endpoint in FastAPI.
  • Integrate the endpoint with background task handling using FastAPI's BackgroundTasks or Celery.
  • Set up API key authentication to secure the endpoint.
  • Implement logging for incoming requests and the training initiation process.
  • Define a monitoring strategy to oversee the training jobs initiated by the endpoint.
  • Create unit and integration tests for the new endpoint.
  • Document the endpoint's functionality and how to interact with it.

Acceptance Criteria:

  • The endpoint must correctly validate and accept training data in the expected format.
  • The training process must be initiated asynchronously without delaying the API response.
  • The endpoint must be secured with API key authentication.
  • Proper logging and monitoring must be in place.
  • All tests must pass, ensuring robustness and reliability.

Notes:

  • Ensure the endpoint conforms to our existing project structure and coding style.
  • Review current project dependencies to determine if additional libraries are needed for task queuing or security.

Add Model Performance Monitoring and Retraining Automation Features

Two new features to the MLflow-based machine learning system: Model Performance Monitoring and Model Retraining Automation. These features will enhance the system's capability to monitor model performance continuously and trigger automated retraining based on predefined criteria.

Features:

  1. Model Performance Monitoring:

    • Design:
      • Create a ModelPerformanceMonitor class that logs performance metrics at regular intervals.
    • Implementation:
    # src/model_ops_manager/mlflow_agent/monitoring.py
    
    # Import necessary modules and classes
    
    class ModelPerformanceMonitor:
        def __init__(self, model_name, model_version, interval=60):
            # Initialize with model name, version, and monitoring interval
            pass
    
        def start_monitoring(self):
            # Start the monitoring thread
            pass
    
        def monitor_performance(self):
            # Continuously fetch and log metrics at defined intervals
            pass
    
        def fetch_metrics(self):
            # Implement logic to fetch performance metrics
            pass
    
        def log_metrics(self, metrics):
            # Log the fetched metrics
            pass
  2. Model Retraining Automation:

    • Design:
      • Create a ModelRetrainer class that automates retraining based on predefined criteria.
    • Implementation:
    # src/model_ops_manager/mlflow_agent/retrainer.py
    
    # Import necessary modules and classes
    
    class ModelRetrainer:
        def __init__(self, retraining_script, criteria):
            # Initialize with retraining script and criteria
            pass
    
        def check_and_retrain(self, model_name, model_version):
            # Check if retraining criteria are met and retrain model if necessary
            pass
    
        def get_run_id(self, model_name, model_version):
            # Get the run ID for a specific model version
            pass
    
        def should_retrain(self, metrics):
            # Determine if retraining is needed based on metrics
            pass
    
        def retrain_model(self):
            # Execute the retraining script
            pass

Tasks:

  • Implement ModelPerformanceMonitor class in monitoring.py.
  • Implement ModelRetrainer class in retrainer.py.
  • Test the performance monitoring functionality.
  • Test the retraining automation functionality.
  • Update documentation to reflect new features.
  • Review and merge the changes.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    šŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. šŸ“ŠšŸ“ˆšŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ā¤ļø Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.