Code Monkey home page Code Monkey logo

Comments (6)

github-actions avatar github-actions commented on August 17, 2024

Thank you for your feedback. Tagging and routing to the team member best able to assist.

from azure-sdk-for-go.

devak8 avatar devak8 commented on August 17, 2024

Here is the dummy test code which i have implemented here

`func main() {

eventHubConnectionString := 
eventHubName :=

storageConnectionString :=
storageContainerName :=

// create a container client using a connection string and container name
checkClient, _ := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)

// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

if err != nil {
	panic(err)
}

// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubConnectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)

if err != nil {
	panic(err)
}

defer consumerClient.Close(context.TODO())

// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

if err != nil {
	panic(err)
}

//  for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
	for {
		partitionClient := processor.NextPartitionClient(context.TODO())

		if partitionClient == nil {
			break
		}

		go func() {
			if err := processEvents(partitionClient); err != nil {
				panic(err)
			}
		}()
	}
}

// run all partition clients
go dispatchPartitionClients()

processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()

if err := processor.Run(processorCtx); err != nil {
	panic(err)
}

}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.Background(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()

	if err != nil && !errors.Is(err, context.DeadlineExceeded) {
		return err
	}

	fmt.Printf("Processing %d event(s)\n", len(events))

	for _, event := range events {
		fmt.Printf("Event received with body %v\n", string(event.Body))
	}
	if len(events) != 0 {
		if err := partitionClient.UpdateCheckpoint(context.Background(), events[len(events)-1], nil); err != nil {
			return err
		}
	}
}

}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}`

please let me know if any partitioning issue or what

from azure-sdk-for-go.

richardpark-msft avatar richardpark-msft commented on August 17, 2024

like incoming messsage is coming 220 and my receiver is receiving 7.20 k data

The balance there seems quite off!

Let's start off with some questions, see if we can eliminate some causes:

  1. Are you seeing duplicate messages?
  2. Are you seeing any failures from partitionClient.UpdateCheckpoint? If that function fails restarts will end up consuming the same events again.
  3. Are you running multiple Processor instances? And if so, are all of those instances using the same consumer group?
  4. Are you using the same Azure Storage Blob container for each run of the Processor?

from azure-sdk-for-go.

shourabhpayal avatar shourabhpayal commented on August 17, 2024

Hi Team I am facing a similar issue where Incoming message count > Outgoing message count (Seems like these message just get dropped. I tried multiple runs.):
image
Answering @richardpark-msft questions above.

  1. No
  2. No
  3. Yes multiple instances using the same consumer group. I run 5 tasks which have consumers consuming messages from my eventhub's 32 partitions (Each consumer is assigned 6-7 partitions).
  4. Yes

from azure-sdk-for-go.

devak8 avatar devak8 commented on August 17, 2024

Issue got fixed, i am closing this issue .
Thanks for your support.

from azure-sdk-for-go.

richardpark-msft avatar richardpark-msft commented on August 17, 2024

Issue got fixed, i am closing this issue . Thanks for your support.

@devak8, I feel like I dropped the ball here but somehow it worked out. I'm curious - did you find something on your end that seemed like the culprit or did the problem just go away?

One thing I was curious about (when thinking about this more) is if you were having some bad interaction with the prefetch cache. I need to write about it some more because it can lead to some imbalanced behavior like this that's not necessarily obvious (but is working as designed).

from azure-sdk-for-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.