Azure Cloud Service - EventProcessor IEventProcessor.ProcessEventsAsync not getting hit

468 views Asked by At

I'm having an issue with a worker role where the role seems to start up successfully and register the EventProcessor class implementations (EventProcessorA and EventProcessorB) but neither of them picks up any events. By this, I mean that the method IEventProcessor.ProcessEventsAsync does not get hit at all.

Each EventProcessor class has it's own event hub set up for it.

My logs show that the constructor and the OpenAsync methods get called for the EventProcessor classes. In fact, they get called exactly 4 times as follows. But after this, there's no additional activity that takes place. I'm guessing 4 times is because there are four partitions.

SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - Open Async
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorB - Constructor
SimpleEventProcessorB - OpenAsync
SimpleEventProcessorA - Constructor
SimpleEventProcessorA - OpenAsync

There's no offset provided for the EventProcessorOptions in the Worker role's RunAsync method either so all the events should come flooding in.

Also, in the Azure portal I see events coming through when I kick them off.

Worker Role code that registers the EventProcessor:

public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

    private EventProcessorHost eventProcessorHostA;
    private EventProcessorHost eventProcessorHostB;

    public override void Run()
    {
        Trace.TraceInformation("ReportWorkerRole is running");

        try
        {
            this.RunAsync(this.cancellationTokenSource.Token).Wait();
        }
        finally
        {
            this.runCompleteEvent.Set();
        }
    }

    public override bool OnStart()
    {
        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit = 12;

        // For information on handling configuration changes
        // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

        bool result = base.OnStart();

        Trace.TraceInformation("ReportWorkerRole has been started");

        //EventHub Processing
        try
        {
            string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA");
            string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB");
            string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString");

            string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount");
            string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey");
            string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString");

            string eventProcessorHostNameA = Guid.NewGuid().ToString();
            eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);

            string eventProcessorHostNameB = Guid.NewGuid().ToString();
            eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);

        }
        catch (Exception ex)
        {
            //Logging omitted
        }

        return result;
    }

    public override void OnStop()
    {
        Trace.TraceInformation("ReportWorkerRole is stopping");

        this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait();
        this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait();

        this.cancellationTokenSource.Cancel();
        this.runCompleteEvent.WaitOne();

        base.OnStop();

        Trace.TraceInformation("ReportWorkerRole has stopped");
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {

        var options = new EventProcessorOptions()
        {
            MaxBatchSize = 100,
            PrefetchCount = 10,
            ReceiveTimeOut = TimeSpan.FromSeconds(20),
            //InitialOffsetProvider = (partitionId) => DateTime.Now
        };

        options.ExceptionReceived += (sender, e) =>
        {
            //Logging omitted
        };

        //Tried both using await and wait
        eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait();
        eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait();
        //await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options);
        //await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options);

        // TODO: Replace the following with your own logic.
        while (!cancellationToken.IsCancellationRequested)
        {
            Trace.TraceInformation("Working");
            await Task.Delay(1000);
        }
    }
}

Event Processor A (same configuration as B):

class SimpleEventProcessorA : IEventProcessor
{

    Stopwatch checkpointStopWatch;

    //Non-relevant variables omitted

    public SimpleEventProcessorA()
    {
        try
        {
            //Initializing variables using CloudConfigurationManager

            //Logging omitted
        }
        catch (Exception ex)
        {
            //Logging omitted
        }
    }

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        //Logging omitted

        Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        try
        {
            //Logging omitted

            Console.WriteLine("Initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();

            return Task.FromResult<object>(null);
        }
        catch (Exception ex)
        {
            //Logging omitted 

            return Task.FromResult<object>(null);
        }
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        //Logging omitted

        foreach (EventData eventData in messages)
        {
            try
            {
                //Logging omitted 

                Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                    context.Lease.PartitionId, data));
            }
            catch (Exception ex)
            {
                //Logging omitted

                throw;
            }
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            this.checkpointStopWatch.Restart();
        }
    }

}

Would appreciate any help, thanks!

UPDATE

Looks like everything was fine... it was the connection string I was using when pushing things into the event hub.

This was my event hub connection string:

Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]

The entityPath was set incorrectly. It was using an older event hub name I had set. It should have been the value set for eventHubNameA or eventHubNameB.

1

There are 1 answers

0
RizJa On BEST ANSWER

Answering the question so that others may benefit from it. Although the answer is detailed in the question under the "UPDATE" section, I'll reiterate it here:

The entityPath was set incorrectly. It was using an older event hub name I had set. It should have been the value set for eventHubNameA or eventHubNameB.

Instead of Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]

It should have been Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]