Code Monkey home page Code Monkey logo

gomsgprocessor's Introduction

GoMsgProcessor

A Golang library for parallel processing of messages to structured documents


Table of Contents

GoMsgProcessor is a generic library to read messages, in a recursively and parallel way, requiring only a builder to transform then to final documents. It is possible to set multiple builders, associating each one with a message type, allowing to work with messages from different sources. Through the namespaces, it is also be able to work with different targets. In addition, a deduplication function can be injected to clean up the slice of documents after the process.

Stack Version
Golang v1.18
golangci-lint v1.46.2
    • Any Golang programming language version installed, preferred 1.18 or later.
  • go get -u github.com/arquivei/gomsgprocessor
    
  • go mod vendor
    go mod tidy
    
    • Import the package

      import (
          "github.com/arquivei/gomsgprocessor"
      )
    • Define a incoming message struct

      type ExampleMessage struct {
          ID            int      `json:"id"`
          Name          string   `json:"name"`
          Age           int      `json:"age"`
          City          string   `json:"city"`
          State         string   `json:"state"`
          ChildrenNames []string `json:"childrenNames"`
          Namespace     string   `json:"namespace"`
      }
    • Implement the Message interface, witch is the input of ParallelProcessor's MakeDocuments.

      func (e *ExampleMessage) GetNamespace() gomsgprocessor.Namespace {
          // Namespace is a logical separator that will be used to group messages while processing then.
          return gomsgprocessor.Namespace(e.Namespace)
      }
      
      func (e *ExampleMessage) GetType() gomsgprocessor.MessageType {
          // MessageType is used to decide which DocumentBuilder to use for each Message.
          return gomsgprocessor.MessageType("typeExample")
      }
      
      func (e *ExampleMessage) UpdateLogWithData(ctx context.Context) {
          // Optional logger method
          log.Ctx(ctx).UpdateContext(func(zc zerolog.Context) zerolog.Context {
              return zc.
                  Int("msg_id", e.ID).
                  Str("msg_name", e.Name).
                  Strs("msg_children_names", e.ChildrenNames).
                  Int("msg_age", e.Age).
                  Str("msg_city", e.City).
                  Str("msg_state", e.State).
                  Str("msg_type", string(e.GetType())).
                  Str("msg_namespace", e.Namespace)
          })
      }
    • Define a outcoming document struct, witch is the result of a DocumentBuilder's Build.

      type ExampleDocument struct {
          ID              string
          CreatedAt       time.Time
          ParentName      string
          ParentBirthYear int
          ChildName       string
          CityAndState    string
          Namespace       string
      }
    • Implement the DocumentBuilder interface, witch transforms a Message into a slice of Documents.

      type ExampleBuilder struct{}
      
      // Build transforms a Message into []Document.
      func (b *ExampleBuilder) Build(_ context.Context, msg gomsgprocessor.Message) ([]gomsgprocessor.Document, error) {
          exampleMsg, ok := msg.(*ExampleMessage)
          if !ok {
              return nil, errors.New("failed to cast message")
          }
      
          // Parallel Processor will ignore this message
          if len(exampleMsg.ChildrenNames) == 0 {
              return nil, nil
          }
      
          documents := make([]gomsgprocessor.Document, 0, len(exampleMsg.ChildrenNames))
      
          for _, childName := range exampleMsg.ChildrenNames {
              documents = append(documents, ExampleDocument{
                  ID:              strconv.Itoa(exampleMsg.ID) + "_" + childName,
                  CreatedAt:       time.Now(),
                  ParentName:      exampleMsg.Name,
                  CityAndState:    exampleMsg.City + " - " + exampleMsg.State,
                  ChildName:       childName,
                  ParentBirthYear: time.Now().Year() - exampleMsg.Age,
                  Namespace:       exampleMsg.Namespace,
              })
          }
      
          return documents, nil
      }
    • Define a (optional) function, used for deduplicate the slice of documents.

        func ExampleDeduplicateDocuments(documents []gomsgprocessor.Document) ([]gomsgprocessor.Document, error) {
            examplesDocuments := make([]ExampleDocument, 0, len(documents))
            for _, document := range documents {
                exampleDocument, ok := document.(ExampleDocument)
                if !ok {
                  return nil, errors.New("failed to cast document")
                }
                examplesDocuments = append(examplesDocuments, exampleDocument)
            }
      
            documentsByID := make(map[string]ExampleDocument, len(examplesDocuments))
            for _, exampleDocument := range examplesDocuments {
                documentsByID[exampleDocument.ID] = exampleDocument
            }
      
            deduplicatedDocuments := make([]gomsgprocessor.Document, 0, len(documentsByID))
            for _, documentByID := range documentsByID {
                deduplicatedDocuments = append(deduplicatedDocuments, documentByID)
            }
            return deduplicatedDocuments, nil
        }
    • And now, it's time!

      func main() {
      
      	// NewParallelProcessor returns a new ParallelProcessor with a map of
      	// DocumentBuilder for each MessageType.
      	//
      	// A list of Option is also available for this method. See option.go for more
      	// information.
      	parallelProcessor := gomsgprocessor.NewParallelProcessor(
      		map[gomsgprocessor.MessageType]gomsgprocessor.DocumentBuilder{
      			"typeExample": &ExampleBuilder{},
      		},
      		gomsgprocessor.WithDeduplicateDocumentsOption(ExampleDeduplicateDocuments),
      	)
      
      	messages := []gomsgprocessor.Message{
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            2,
      			Name:          "Poul",
      			Age:           25,
      			City:          "New Jersey",
      			State:         "NY",
      			ChildrenNames: []string{},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace2",
      		},
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      	}
      
      	// MakeDocuments creates in parallel a slice of Document for given []Message
      	// using the map of DocumentBuilder (see NewParallelProcessor).
      	//
      	// This method returns a []Document and a (foundationkit/errors).Error.
      	// If not nil, this error has a (foundationkit/errors).Code associated with and
      	// can be a ErrCodeBuildDocuments or a ErrCodeDeduplicateDocuments.
      	documents, err := parallelProcessor.MakeDocuments(context.Background(), messages)
      	if err != nil {
      		panic(err)
      	}
      
      	examplesDocuments := make([]ExampleDocument, 0, len(documents))
      	for _, document := range documents {
      		exampleDocument, ok := document.(ExampleDocument)
      		if !ok {
      			panic("failed to cast document")
      		}
      		examplesDocuments = append(examplesDocuments, exampleDocument)
      	}
      
      	fmt.Println(JSONMarshal(examplesDocuments))
      }
      
      // Simple json marshaler with indentation
      func JSONMarshal(t interface{}) (string, error) {
      	buffer := &bytes.Buffer{}
      	encoder := json.NewEncoder(buffer)
      	encoder.SetEscapeHTML(false)
      	encoder.SetIndent("", "  ")
      	err := encoder.Encode(t)
      	return buffer.String(), err
      }
  • GoMsgProcessor 0.1.0 (May 20, 2022)

    • [New] Decoupling this package from Arquivei's API projects.
    • [New] Setting github's workflow with golangci-lint
    • [New] Example for usage.
    • [New] Documents: Code of Conduct, Contributing, License and Readme.

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

We use Semantic Versioning for versioning. For the versions available, see the tags on this repository.

This project is licensed under the BSD 3-Clause - see the LICENSE.md file for details.

Contacts can be made by email: [email protected]

gomsgprocessor's People

Contributors

renovate[bot] avatar victormn avatar rilder-almeida avatar marcosbmf avatar leoeareis avatar rjfonseca avatar

Stargazers

Lucas Daniel avatar Nikita Zhenev avatar  avatar

Watchers

Gleicon Moraes avatar  avatar Mateus Constanzo avatar raulocdev avatar Francisco Edno avatar Edison Silva Jr avatar william-oliveira avatar  avatar André Lázari avatar  avatar

gomsgprocessor's Issues

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Repository problems

These problems occurred while renovating this repository. View logs.

  • WARN: Found renovate config warnings

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

github-actions
.github/workflows/go-test.yml
  • actions/checkout v4
  • actions/setup-go v5
.github/workflows/golangci-lint.yml
  • actions/checkout v4
  • golangci/golangci-lint-action v6
gomod
go.mod
  • go 1.21
  • github.com/arquivei/foundationkit v0.9.1
  • github.com/rs/zerolog v1.33.0
  • github.com/stretchr/testify v1.9.0
  • golang.org/x/sync v0.7.0

  • Check this box to trigger a request for Renovate to run again on this repository

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.