Code Monkey home page Code Monkey logo

conc's Introduction

conch

conc: better structured concurrency for go

Go Reference Sourcegraph Go Report Card codecov Discord

conc is your toolbelt for structured concurrency in go, making common tasks easier and safer.

go get github.com/sourcegraph/conc

At a glance

All pools are created with pool.New() or pool.NewWithResults[T](), then configured with methods:

Goals

The main goals of the package are:

  1. Make it harder to leak goroutines
  2. Handle panics gracefully
  3. Make concurrent code easier to read

Goal #1: Make it harder to leak goroutines

A common pain point when working with goroutines is cleaning them up. It's really easy to fire off a go statement and fail to properly wait for it to complete.

conc takes the opinionated stance that all concurrency should be scoped. That is, goroutines should have an owner and that owner should always ensure that its owned goroutines exit properly.

In conc, the owner of a goroutine is always a conc.WaitGroup. Goroutines are spawned in a WaitGroup with (*WaitGroup).Go(), and (*WaitGroup).Wait() should always be called before the WaitGroup goes out of scope.

In some cases, you might want a spawned goroutine to outlast the scope of the caller. In that case, you could pass a WaitGroup into the spawning function.

func main() {
    var wg conc.WaitGroup
    defer wg.Wait()

    startTheThing(&wg)
}

func startTheThing(wg *conc.WaitGroup) {
    wg.Go(func() { ... })
}

For some more discussion on why scoped concurrency is nice, check out this blog post.

Goal #2: Handle panics gracefully

A frequent problem with goroutines in long-running applications is handling panics. A goroutine spawned without a panic handler will crash the whole process on panic. This is usually undesirable.

However, if you do add a panic handler to a goroutine, what do you do with the panic once you catch it? Some options:

  1. Ignore it
  2. Log it
  3. Turn it into an error and return that to the goroutine spawner
  4. Propagate the panic to the goroutine spawner

Ignoring panics is a bad idea since panics usually mean there is actually something wrong and someone should fix it.

Just logging panics isn't great either because then there is no indication to the spawner that something bad happened, and it might just continue on as normal even though your program is in a really bad state.

Both (3) and (4) are reasonable options, but both require the goroutine to have an owner that can actually receive the message that something went wrong. This is generally not true with a goroutine spawned with go, but in the conc package, all goroutines have an owner that must collect the spawned goroutine. In the conc package, any call to Wait() will panic if any of the spawned goroutines panicked. Additionally, it decorates the panic value with a stacktrace from the child goroutine so that you don't lose information about what caused the panic.

Doing this all correctly every time you spawn something with go is not trivial and it requires a lot of boilerplate that makes the important parts of the code more difficult to read, so conc does this for you.

stdlib conc
type caughtPanicError struct {
    val   any
    stack []byte
}

func (e *caughtPanicError) Error() string {
    return fmt.Sprintf(
        "panic: %q\n%s",
        e.val,
        string(e.stack)
    )
}

func main() {
    done := make(chan error)
    go func() {
        defer func() {
            if v := recover(); v != nil {
                done <- &caughtPanicError{
                    val: v,
                    stack: debug.Stack()
                }
            } else {
                done <- nil
            }
        }()
        doSomethingThatMightPanic()
    }()
    err := <-done
    if err != nil {
        panic(err)
    }
}
func main() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    // panics with a nice stacktrace
    wg.Wait()
}

Goal #3: Make concurrent code easier to read

Doing concurrency correctly is difficult. Doing it in a way that doesn't obfuscate what the code is actually doing is more difficult. The conc package attempts to make common operations easier by abstracting as much boilerplate complexity as possible.

Want to run a set of concurrent tasks with a bounded set of goroutines? Use pool.New(). Want to process an ordered stream of results concurrently, but still maintain order? Try stream.New(). What about a concurrent map over a slice? Take a peek at iter.Map().

Browse some examples below for some comparisons with doing these by hand.

Examples

Each of these examples forgoes propagating panics for simplicity. To see what kind of complexity that would add, check out the "Goal #2" header above.

Spawn a set of goroutines and waiting for them to finish:

stdlib conc
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // crashes on panic!
            doSomething()
        }()
    }
    wg.Wait()
}
func main() {
    var wg conc.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Go(doSomething)
    }
    wg.Wait()
}

Process each element of a stream in a static pool of goroutines:

stdlib conc
func process(stream chan int) {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range stream {
                handle(elem)
            }
        }()
    }
    wg.Wait()
}
func process(stream chan int) {
    p := pool.New().WithMaxGoroutines(10)
    for elem := range stream {
        elem := elem
        p.Go(func() {
            handle(elem)
        })
    }
    p.Wait()
}

Process each element of a slice in a static pool of goroutines:

stdlib conc
func process(values []int) {
    feeder := make(chan int, 8)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range feeder {
                handle(elem)
            }
        }()
    }

    for _, value := range values {
        feeder <- value
    }
    close(feeder)
    wg.Wait()
}
func process(values []int) {
    iter.ForEach(values, handle)
}

Concurrently map a slice:

stdlib conc
func concMap(
    input []int,
    f func(int) int,
) []int {
    res := make([]int, len(input))
    var idx atomic.Int64

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for {
                i := int(idx.Add(1) - 1)
                if i >= len(input) {
                    return
                }

                res[i] = f(input[i])
            }
        }()
    }
    wg.Wait()
    return res
}
func concMap(
    input []int,
    f func(*int) int,
) []int {
    return iter.Map(input, f)
}

Process an ordered stream concurrently:

stdlib conc
func mapStream(
    in chan int,
    out chan int,
    f func(int) int,
) {
    tasks := make(chan func())
    taskResults := make(chan chan int)

    // Worker goroutines
    var workerWg sync.WaitGroup
    for i := 0; i < 10; i++ {
        workerWg.Add(1)
        go func() {
            defer workerWg.Done()
            for task := range tasks {
                task()
            }
        }()
    }

    // Ordered reader goroutines
    var readerWg sync.WaitGroup
    readerWg.Add(1)
    go func() {
        defer readerWg.Done()
        for result := range taskResults {
            item := <-result
            out <- item
        }
    }()

    // Feed the workers with tasks
    for elem := range in {
        resultCh := make(chan int, 1)
        taskResults <- resultCh
        tasks <- func() {
            resultCh <- f(elem)
        }
    }

    // We've exhausted input.
    // Wait for everything to finish
    close(tasks)
    workerWg.Wait()
    close(taskResults)
    readerWg.Wait()
}
func mapStream(
    in chan int,
    out chan int,
    f func(int) int,
) {
    s := stream.New().WithMaxGoroutines(10)
    for elem := range in {
        elem := elem
        s.Go(func() stream.Callback {
            res := f(elem)
            return func() { out <- res }
        })
    }
    s.Wait()
}

Status

This package is currently pre-1.0. There are likely to be minor breaking changes before a 1.0 release as we stabilize the APIs and tweak defaults. Please open an issue if you have questions, concerns, or requests that you'd like addressed before the 1.0 release. Currently, a 1.0 is targeted for March 2023.

conc's People

Contributors

bobheadxi avatar camdencheek avatar exfly avatar honkkki avatar kke avatar link512 avatar lsmoura avatar mateusz834 avatar miparnisari avatar moond4rk avatar sashamelentyev avatar shadow3x3x3 avatar simonrichardson avatar xiaoconstantine avatar zdyxry avatar zhongdai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

conc's Issues

Feature Request: Ordered ResultPool for function call order preservation

Hello,

First, I would like to express my gratitude for your hard work on the conc library. The structured concurrency it brings to Go makes complex tasks a lot more manageable.

I am writing to propose a new feature that would further enrich the functionality of the library. The idea is to create a new type of entity that combines the functionalities of ResultPool and Stream. The main goal of this entity would be to provide a concurrent task runner that not only collects task results but also maintains the order of the calls to the functions. In simpler terms, it would give you a slice ordered by the call order of functions.

Currently, the ResultPool is great for running tasks concurrently and collecting the results, but it doesn't necessarily maintain the order of the functions calls. On the other hand, the Stream entity allows for processing an ordered stream of tasks in parallel but does not collect the results.

The proposed entity could be very useful in situations where you want to run tasks concurrently, collect their results, and also preserve the order of the tasks.

Please, let me know what you think about this proposal. I am also open to contributing towards the development of this feature if that would be acceptable.

Thank you for your time and consideration.

Solves #110

Use custom go pool

conc would spawns a new goroutine when pool.Go call each time.
It would cause waste of resources in some case.
Is it had a plan to use a custom goroutine pool instead of spawn every time ?
for example:

type gPool interface {
    Do(func (){})
}

pool.NewWithCustomPool(gPool)

Panic: send on closed channel

This happens randomly when using conc Pool (pool.New()).

panic: send on closed channel

goroutine 17805 [running]:
config-rd/vendor/github.com/sourcegraph/conc/pool.(*Pool).Go(0xc0005c2000, 0xc0a551e558)
E:/go/src/config-rd/vendor/github.com/sourcegraph/conc/pool/pool.go:45 +0x113
main.doGGSN(0xc0e964b688?, 0x1)
E:/go/src/config-rd/parser.go:63 +0xfd8
main.main.func3.1({{0xc031bbafc0, 0x10}, {0xc031bbafd0, 0x4}, {0xc03c3b36e0, 0x1a}, 0x1, 0x0, {0x0, 0x0}, ...})
E:/go/src/config-rd/main.go:375 +0x1c5

pool: tasks can't launch tasks

A pattern in structured concurrency is to pass the lifetime object into the concurrent tasks so that they can launch additional tasks with the same lifetime. This way, the originator can wait on – and receive errors from – all work at a single point, but subtasks can run concurrently when they don't have dependencies between each other. The "Go statement considered harmful" article describes this pattern and its advantages in more detail.

However, conc/pool prevents this pattern. Once a call to Wait occurs, new tasks cannot be submitted to the pool, because the task submission channel becomes closed:

conc/pool/pool.go

Lines 72 to 75 in 4afefce

func (p *Pool) Wait() {
p.init()
close(p.tasks)

As a concrete example, I wanted to use a ContextPool to process SQS messages in batches roughly as follows:

  1. Decode the message body. If this fails, return immediately; the message must remain in the queue so that it eventually becomes a dead letter.
  2. Delete the decoded message from the queue so that it isn't retried anymore.
  3. Process the message according to business logic.

While steps 2 and 3 can each fail, there's no dependency between them, so they can in principle run currently. The approach I attempted was to p.Go a task to run step 1, then p.Go a subtask for step 2, and then finish step 3. This approach causes a send on closed channel panic.

Tested on both v0.3.0 and v0.3.1-0.20240108182409-4afefce20f9b (current main).

Missing `Errors() []error` if used Go 1.20

Ok, my bad, now it's Unwrap() []error.
IMO it should be noted somewhere, that the behavior of the library varies based on the version of used Go.
It can cause backward incompatibility to some.

Old text:
The errors.Join() feature in Go 1.20 is nice, but it's not possible to split the errors back to slice of errors.

With Uber implementation, that's under build tag for Go 1.19, it is possible

deadlock!

i tried Pool_WithCancelOnError but got deadlock , how fix it?
, thank


package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/sourcegraph/conc/pool"
)

func main() {

	ExampleContextPool_WithCancelOnError()
}

func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(10).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 90; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 10 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
github.com/sourcegraph/conc/pool.(*Pool).Go(0xc000116600, 0xc0000ac5e8)        
        D:/gopath/src/github.com/sourcegraph/conc/pool/pool.go:54 +0x95        
github.com/sourcegraph/conc/pool.(*ErrorPool).Go(0xc000116600, 0xc0000ac5d0)   
        D:/gopath/src/github.com/sourcegraph/conc/pool/error_pool.go:29 +0x7a  
github.com/sourcegraph/conc/pool.(*ContextPool).Go(0xc000116600, 0xc0000af250) 
        D:/gopath/src/github.com/sourcegraph/conc/pool/context_pool.go:25 +0x7a
main.ExampleContextPool_WithCancelOnError()
        /pool_t50.go:23 +0xc6 
main.main()
        /pool_t50.go:13 +0x17 

goroutine 21 [chan receive]:
main.ExampleContextPool_WithCancelOnError.func1({0xa8ab90?, 0xc000092440?})    
        /pool_t50.go:27 +0x69 
github.com/sourcegraph/conc/pool.(*ContextPool).Go.func1()
        D:/gopath/src/github.com/sourcegraph/conc/pool/context_pool.go:26 +0x2e
github.com/sourcegraph/conc/pool.(*ErrorPool).Go.func1()
        D:/gopath/src/github.com/sourcegraph/conc/pool/error_pool.go:30 +0x29  
github.com/sourcegraph/conc/pool.(*Pool).worker(0x0?)
        D:/gopath/src/github.com/sourcegraph/conc/pool/pool.go:154 +0x7a       
github.com/sourcegraph/conc/panics.(*Catcher).Try(0x0?, 0x0?)
        D:/gopath/src/github.com/sourcegraph/conc/panics/panics.go:23 +0x53    
github.com/sourcegraph/conc.(*WaitGroup).Go.func1()
        D:/gopath/src/github.com/sourcegraph/conc/waitgroup.go:32 +0x65
created by github.com/sourcegraph/conc.(*WaitGroup).Go
        D:/gopath/src/github.com/sourcegraph/conc/waitgroup.go:30 +0x85

        D:/gopath/src/github.com/sourcegraph/conc/waitgroup.go:32 +0x65
created by github.com/sourcegraph/conc.(*WaitGroup).Go
        D:/gopath/src/github.com/sourcegraph/conc/waitgroup.go:30 +0x85



goroutine leak

I wrote a benchmark for pool and, unless my benchmark is wrong or my understanding of how the library works is wrong, there is a goroutine leak:

func BenchmarkPool(b *testing.B) {
	b.Run("without_error", func(b *testing.B) {
		fmt.Println("before", runtime.NumGoroutine())
		for i := 0; i < b.N; i++ {
			p := pool.New().WithMaxGoroutines(10)
			for j := 0; j < 1000; j++ {
				p.Go(func() {
					r := rand.Intn(10)
					time.Sleep(time.Duration(r) * time.Microsecond)
				})
			}

			p.Wait()
		}
		fmt.Println("after", runtime.NumGoroutine())
	})

If you run this: go test -v ./pool -run=XXX -bench=BenchmarkPool -count 10000

most of the times, I get

before 3
after 3
before 3
after 3
before 3
after 3

but I also saw

before 3
after 3
before 3
after 4  // <------- ?
before 3
after 3

I tested this on a Macbook Pro with Intel i5, it might be harder to reproduce on faster CPUs.

Replace `github.com/sourcegraph/lib/errors` with stdlib

We currently depend on the Sourcegraph error library, which pulls in a lot of dependencies. We only use the multierror type from that library. The functionality of the multierror type will be included in the standard library in the go 1.20 release. Once 1.20 is released, we should remove the dependency and replace it with the standard library implementation.

[Feature] Async returns

👋 I really like abstractions but I'm missing an async return, probably a channel, that would send results as they've been processed. So far the pools return all or none results. Is this something that would fit this library?

Implement goroutines state monitor?

Sometimes there are cases, when goroutines states should be printed in console.
Similar functionality: hmdsefi/gowl

Usage proposal:

p := pool.New().WithMaxGoroutines(10).WithMonitor(monitor, time.Second)
p.Go(func(tp *threadParameter) (res interface{}, err error) {
    tp.setState("Doing work1...")
    err = work1()

    tp.setState("Doing work2...")
    res, err = work2()
    return
})

func monitor(ts []*threadState) error {
    return fmt.Fprintf(os.Stderr, "Thread states: %v", ts);
}

Unexpected behaviour during panic and with `WithCancelOnError` set

The current behaviour when a panic occurs in a go routine is that it is recovered and the "error" is propagated when Wait() returns. However, when WithCancelOnError is set, this does not trigger an immediate cancelation of the pool and the remaining go routines. When I encountered this behaviour there was no visibility that anything had gone wrong and the application kept on executing.

I feel this is counter-intuitive and would prefer that the panic have the same behaviour as a function returning an error. The panic can be recovered, but should result in a cancelation of the pool context.

Clarify if `Stream.Go` blocks while the pool is full?

From what I can see Stream will block when calling Go if the internal pool is fully occupied, just wanted to make sure this was the case, and ask for it to be documented. If we're calling it for thousands for items for example, need to know that there will be automatic back-pressure.

WithTimeout

Is it possible to add the wg.GoWithTimeout method, each coroutine needs to control the timeout period, or wg.WaitWithTimeout() The overall time-consuming control.

Question: When will guys release the first version?

The README indicates that

This package is currently pre-1.0. There are likely to be minor breaking changes before a 1.0 release as we stabilize the APIs and tweak defaults. Please open an issue if you have questions, concerns, or requests that you'd like addressed before the 1.0 release. Currently, a 1.0 is targeted for March 2023.

Currently, we are on 1 May. We want to make sure this great library is in production ready :)

pool init

I don't think pool.init method is necessary, and it needs to be called in both Wait and Go methods, so how about initializing tasks when creating the pool?

Add a `WithKeyedResult` option to the result pool

I have been using the result pool to fan-out some I/O operations, eg. fetch data from database. The code is something like,

p := pool.NewWithResults[[]record]().WithErrors().WithContext(ctx)

// queries is []string type
for _, query := range queries {
  p.Go(
    func(context.Context) ([]record, error) {
      return databaseCall(ctx, query)
    })
}

result, err := p.Wait()

In this example, the record is a struct to map to a database record, and each databaseCall() would return N records as a []record type.

If any of the query failed to run (eg. timeout), the result (typed [][]record) contains only the succeed returned []record. So the len(queries) != len(result).

In the case, I need to re-try the failed queries, I don't know which ones are failed. So if we can have a option to return a map[string][]record, we can check if which batch(es) are failing.

Reset `ErrorPool` errors after calling `Wait()`

Right now, ErrorPool errors only accumulate. I think the desired behavior is to reset them (p.errs = nil) after each call to Wait() - otherwise on the next call to wait you get all the previous errors too

Question about limiting concurrency in pools

Hi @camdencheek, thanks for your work on this!

I have questions around the use of limitations on concurrency in the pool package. My understanding is that Go is built for having huge numbers of concurrent goroutines, since it has its own scheduler that can assign many lightweight goroutines to each OS thread (I found this article very enlightening). By having a default MaxGoroutines of runtime.GOMAXPROCS(0), the package is generally going to cap the concurrency such that the Go scheduler can't be maximally fast or clever. Also, anytime one of those goroutines gets blocked on a channel, you'll have one fewer live thread.

I know this is how a pool is supposed to work, but I bet a lot of people will be wanting to use a conc pool because they want a more sophisticated errgroup. Concurrency capping is useful in some cases, but not always, and I'm wondering if people might shoot themselves in the foot. I actually can't think of a case when runtime.GOMAXPROCS(0) would be a good default. In a concurrent API call situation, I'd want to cap the QPS, not the concurrency, and in a CPU-bound situation, there's no harm in allowing many more concurrent goroutines and allowing the Go scheduler to choose how to allocate goroutines to OS threads.

Sorry for the huge message and thanks again for your work!

[Feature] Separate data structures from pools

Currently the pools such as ResultPool and ResultErrorPool hold a private data structure for the return values. It would be nice to export them and make them part of the library. They are mostly simple but especially the one for iter.Map would be interesting. Maybe it's out of scope for this library but it would be nice.

[Question] - How could I use this package over built-in async functions

I'm trying to use this package over following code but not sure how I should change the code.

package main

import (
	"bufio"
	"fmt"
	"log"
	"os"
	"sync"
	"sync/atomic"
	"time"

	json "github.com/bytedance/sonic"
)

func main() {
	start := time.Now()
	file, err := os.Open(os.Args[1])
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	type entry map[string]any
	var logEntries []entry

	linesChunkLen := 64 * 1024
	linesChunkPoolAllocated := int64(0)
	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]string, 0, linesChunkLen)
		atomic.AddInt64(&linesChunkPoolAllocated, 1)
		return lines
	}}
	lines := linesPool.Get().([]string)[:0]

	entriesPoolAllocated := int64(0)
	entriesPool := sync.Pool{New: func() interface{} {
		entries := make([]entry, 0, linesChunkLen)
		atomic.AddInt64(&entriesPoolAllocated, 1)
		return entries
	}}
	mutex := &sync.Mutex{}
	wg := sync.WaitGroup{}

	scanner.Scan()
	for {
		lines = append(lines, scanner.Text())
		willScan := scanner.Scan()
		if len(lines) == linesChunkLen || !willScan {
			linesToProcess := lines
			wg.Add(len(linesToProcess))
			go func() {
				entries := entriesPool.Get().([]entry)[:0]
				for _, text := range linesToProcess {
					var entry map[string]any
					json.Unmarshal([]byte(text), &entry)
					entries = append(entries, entry)
				}
				linesPool.Put(linesToProcess)
				mutex.Lock()
				for _, entry := range entries {
					logEntries = append(logEntries, entry)
				}
				mutex.Unlock()
				entriesPool.Put(entries)
				wg.Add(-len(entries))
			}()
			lines = linesPool.Get().([]string)[:0]
		}
		if !willScan {
			break
		}
	}
	wg.Wait()

	// report c2: names at index
	fmt.Printf("time: %v\n", time.Since(start))

}

Please help to understand on how should I change the code.

Add a `Wait()`-like method that returns a caught panic

Re-panicking is not ideal for every situation. Sometimes, in the case of a panic, a user will want to handle the panic manually. It's possible to recover() the propagated panic, but that's extra work for the user and you lose the type information that the panic value is a RecoveredPanic.

We should add a method like WaitSafe() that returns a *RecoveredPanic if a panic was caught.

Open questions:

  • What to call it? WaitSafe, SafeWait, WaitAndRecover all come to mind
  • Do we add this just to WaitGroup, or also add it to each of the .*Pool types?

How to keep the order of result?

Hi everyone,

I'm using conc to handle concurrency.
For example, for each user_id, I have 2 tasks:

  • Fetch user data from the user service (returned data is called userData)
  • Fetch loyalty points from the reward service (returned data is called loyaltyPoints)

I want to do these 2 tasks concurrently, and the result is saved into the results variable
I expected the userData = results[0] and loyaltyPoints = results[1] (the order that they were added to the pool.NewWithResults[any]().WithContext(ctx))

But I realized that the order of result is not guaranteed, sometimes loyaltyPoints = results[0] and vice versa

How can I keep the order of result?
Thanks

context cancel propagation does not work if all tasks are completed, but you haven't called Wait().

repro:

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
p := pool.New().WithMaxGoroutines(20)
for ctx.Err() == nil {
	p.WithErrors().WithContext(ctx).Go(func(ctx context.Context) error { return nil })
	fmt.Println("pushed task")
}

ctrl+C is not exiting the program

sidenote:
I wrote this code thinking that, the pool would release a worker when a task is done.
I misunderstood the usecase of conc. I thought I could use it as a simple concurrency limiter/backpressure, and continuously push task to it as fast as it can handle them.

the fact that I have to call Wait makes it more of a batch prossessing helper than a concurrency helper.

I looked at Stream, and was surprised that it also required to call Wait().

Implement task groups and returning results

Sometimes we need to run a lot of tasks, grouped into small sets (less than goroutines count). The effective way is to run groups concurrently, not one-by-one (they are small). But the result should be grouped. Similar logic is implemented in alito/pong

One of usage case is parsing of search results:

s := stream.New().WithMaxGoroutines(10)
groups := make([]*conc.Group, len(searchQueries), 0)

for query := searchQueries {
    g := s.group(query)
    groups = append(groups, g)

    for pageNumber := 1; pageNumber < 5; pageNumber++ {
        pageNumber := pageNumber
        g.Go(func() (res interface{}, err error) {
            res, err = getPageResults(query, pageNumber)
            return
        })
    }

    g.onFinish(printGroupResults)
}

func printGroupResults(g *conc.group, results []interface, err error) error {
    fmt.Printf("Results of search query %v: %v\n", g.GetId(), results)
    return nil
}


// or synchronous
for g := groups {
    results := g.GetResults(); // or g.Wait()
    _ = printGroupResults(g, results, nil)
}

Or how can I implement this logic an easy way with current functionality?

shallow copy of the pool is a bug

func TestErrorPool(t *testing.T) {
	p := New().WithMaxGoroutines(1)
	p.Go(func() {})
	require.Equal(t, len(p.limiter), 1)
	//len(p.limiter) =1
	ep := p.WithErrors()
	require.Equal(t, len(ep.pool.limiter), 0) // expected =1,shouldn't it equal 0?
}

As an example,

func main() {
	p := pool.New().WithMaxGoroutines(2)
	p.Go(func() {
		fmt.Println("g1")
		time.Sleep(time.Second)
	})
	p.Go(func() {
		fmt.Println("g2")
		time.Sleep(time.Second)
	})
	//p.Wait()
	ep := p.WithErrors()
	ep.Go(func() error {
		// Never to run
		fmt.Println("with err ")
		return nil
	})
	ep.Wait()
	//if p.Wait after the ep.Wait
	p.Wait()
}

The problem is using same limiter

FanOut/Until Discussion

I like what you have built here, It's almost exactly what we need! We at mailgun have been using https://github.com/mailgun/holster/tree/master/syncutil for quite some time. However, there are two things missing from this library of which I would like to open for discussion.

Until

conc.WaitGroup.Until() https://github.com/mailgun/holster/blob/master/syncutil/waitgroup.go#L56

Until() simplifies running and stopping many long running go rountines. You can chain Until() multiple times and call Stop() to signal all the go rountine's which are looping to end.

func main() {
    // Golang std pattern
    {
        done := make(chan struct{})
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            for {
                select {
                case <-time.After(time.Second):
                case <-done:
                    wg.Done()
                    return
                }
            }
        }()
        close(done)
        wg.Wait()
    }

    // Until
    {
        wg := syncutil.WaitGroup{}
        wg.Until(func(done chan struct{}) bool {
            select {
            case <-time.After(time.Second):
            case <-done:
                return false
            }
            return true
        })
        wg.Stop()
    }
}

In this example, we save state by not needing the obligatory done every time we start a new go routine. It also saves a single indent as we no longer need the for loop. Our typical use case is to have many go rountines running which all need to be shutdown when the service ends. Avoiding the need for a donevariable every time we use this pattern has been nice. Combined with the panic handling in the conc, Until would be even more useful.

Fanout

It appears that conc.pool provides almost everything our current implementation of Fanout does and more. However, we have a VERY common use case where we need to pass in a local variable to the closure to avoid race conditions.

{
    // Limits the max number of go routines running concurrently to 10
    // Loop will block when capacity is reached.
    fan := syncutil.NewFanOut(10)
    for i := 0; i < 100; i++ {
        // Should be updated to support generics
        fan.Run(func(o interface{}) error {
            num := o.(int)
            _, _ = http.Get(fmt.Sprintf("https://example.com/integers/%d", num))
            return nil
        }, i)
    }

    // Should be updated to return an err that supports
    // the new `Unwrap() []error` for multi errors in golang 1.20
    errs := fan.Wait()
    if len(errs) != 0 {
        panic(errs)
    }
}

This isn't a request to add these features so much as a wish to start a conversation around them and other common use cases which might not be covered here.

Request: Implementing WaitAndRecover in each of the .*Pool

The goroutine pools in this project are very useful, but currently lack a way to safely wait and recover from any panics like conc.WaitGroup . It would be great if a WaitAndRecover() method could be added to the pools that returns a *panics.Recovered if a panic occurs.Most actual business users prefer to handle panic manually rather than exiting directly.

Panic handling for pools

I find the idea of this library very interesting. I am looking to use ErrorPool WithContext, but it does not handle panics automatically, different than the conc.WaitGroup. I was expecting all features (including pool) to handle: Goal #2: Handle panics gracefully

Is there a reason not to handle panics for the pools?

Do we want a channel-based parallel processor?

I've written https://github.com/sudhirj/cirque

It basically takes a processing function and gives you an input and output channel, and inputs sent to the input channel and processed in parallel with the results sent in order on the output channel. Very similar to Stream but off a channel API.

If we want to roll this into conc and based it off Stream I can raise a PR.

How do I disable stacktrace output on panic?

How do I disable stacktrace output on panic? I wouldn't want anyone to see the details when using the software when panicking. Please tell me how this can be hidden. I apologize in advance for everything, I am a newbie

Add `context` support to `Go` method?

conc/pool/pool.go

Lines 54 to 67 in 1d4991d

select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)
// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}

If WithMaxGoroutines is set, the caller may block until a worker is released, but in some scenarios, it is necessary for the caller to decide when to cancel or wait for a timeout.

func (p *Pool) GoContext(ctx context.Context, f func()) error
or
func (p *Pool) TryGo(ctx context.Context, f func()) error

select { 
 case <-ctx.Done():  // <------------
 	return ctx.Error()
 case p.limiter <- struct{}{}: 
 	// If we are below our limit, spawn a new worker rather 
 	// than waiting for one to become available. 
 	p.handle.Go(p.worker) 
  
 	// We know there is at least one worker running, so wait 
 	// for it to become available. This ensures we never spawn 
 	// more workers than the number of tasks. 
 	p.tasks <- f 
 case p.tasks <- f: 
 	// A worker is available and has accepted the task. 
 	return nil
 } 

unnecessary dependencies

conc is a very low level library. Importing it indirectly requires over half a dozen packages it doesn't use.

go: downloading github.com/cockroachdb/errors v1.9.0
go: downloading github.com/cockroachdb/redact v1.1.3
go: downloading github.com/getsentry/sentry-go v0.13.0
go: downloading github.com/gogo/protobuf v1.3.2
go: downloading github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f
go: downloading github.com/kr/pretty v0.3.0
go: downloading github.com/kr/text v0.2.0
go: downloading github.com/rogpeppe/go-internal v1.9.0

[Question] Why ForEach slower than conc.WaitGroup

I have the slice of pointer to custom struct (type SS []*S).

The following code execution time is 5 seconds:

var wg conc.WaitGroup
for _, s := range ss {
	s := s
	wg.Go(func() {
		s.GetStatus()
	})
}
wg.Wait()

The following code execution time is 1 minute:

iter.ForEach(ss, func(val **S) { (*val).GetStatus() })

GetStatus is function to network tcp/udp port check.

`iter` would benefit from configurable concurrency.

Map and ForEach currently use the GOMAXPROCS value to determine how many go routines run in parallel, but there's lots of situations (like making one network request per item in the slice) that would benefit from making that concurrency configurable. Yes, Stream is available, but iter has more fitting constructs. Would suggest an additional set of methods to override.

I can work on this the we're ok with the idea.

Exit on the first error encountered

When we run batch tasks, sometimes an error in one subtask means that the whole task is wrong. So it is desirable to add the ability to exit on the first error encountered.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.