I need something like AutoResetEvent only with FIFO

38 views Asked by At

I am calling an REST API. If I am using multiple threads I reach a point where I get response like HTTP 429 (Too Many Requests). Unfortunate the limit is not call/second but a rater complicated calculation that can not be done on the client side. Also the limit that is calculated is only an guaranteed throughput and can be higher based on the overall system load. Bottom line is I can only react to that HTTP Response. What I need is an way to make calls as fast as possible in parallel with respect to the call order. If I get the 429 Response I need to hold all other calls and retry the failed call. And then resume the other calls. I created an Helper class to wrap this logic. The helper class can be called from multiple threads. Also this is only quick proof of concept for testing not the final implementation. The problem with this implementation is that there is no FIFO. Instead Tasks are released in random order.

internal static class AlchemyHelper
    {
        private static AutoResetEvent _alchemyEvent = new(true);

        public static async Task<TReturn> SendContractQuery<TFunctionMessage, TReturn>(IContractQueryHandler<TFunctionMessage> contractQuery, string contractAddress, TFunctionMessage? functionMessage = null) where TFunctionMessage: FunctionMessage, new()
        {
            try
            {
                return await AlchemyNonBlockingQueryCall<TFunctionMessage, TReturn>(contractQuery, contractAddress, functionMessage);
            }
            catch (RpcResponseException ex) when (ex.RpcError.Code == 429)
            {
                // Retry
                return await AlchemyBlockingQueryCall<TFunctionMessage, TReturn>(contractQuery, contractAddress, functionMessage);
            }
        }

        private static async Task<TReturn> AlchemyNonBlockingQueryCall<TFunctionMessage, TReturn>(IContractQueryHandler<TFunctionMessage> contractQuery, string contractAddress, TFunctionMessage? functionMessage) where TFunctionMessage : FunctionMessage, new()
        {
            _alchemyEvent.WaitOne();
            _alchemyEvent.Set();
            if (functionMessage == null)
            {
                return await contractQuery.QueryAsync<TReturn>(contractAddress);
            }
            else
            {
                return await contractQuery.QueryAsync<TReturn>(contractAddress, functionMessage);
            }
        }

        private static async Task<TReturn> AlchemyBlockingQueryCall<TFunctionMessage, TReturn>(IContractQueryHandler<TFunctionMessage> contractQuery, string contractAddress, TFunctionMessage? functionMessage) where TFunctionMessage : FunctionMessage, new()
        {
            Random r = new Random();
            _alchemyEvent.WaitOne();
            TimeSpan wait = TimeSpan.FromSeconds(1);
            TReturn ret;
            
            while (true)
            {
                try
                {
                    int ms = r.Next(0, 1000);
                    TimeSpan backoff = wait.Add(TimeSpan.FromMilliseconds(ms));
                    await Task.Delay(backoff); // Wait the backoff time
                    if (functionMessage == null)
                    {
                        ret = await contractQuery.QueryAsync<TReturn>(contractAddress);
                    }
                    else
                    {
                        ret = await contractQuery.QueryAsync<TReturn>(contractAddress, functionMessage);
                    }
                    _alchemyEvent.Set();
                    return ret;
                }
                catch (RpcResponseException ex) when (ex.RpcError.Code == 429)
                {
                    //Error still happening
                    if(wait < TimeSpan.FromSeconds(32))
                    {
                        wait = wait.Add(TimeSpan.FromSeconds(1));
                    }
                    //throw every other error normaly
                }
            }
        }
    }

What I nee dis an option to keep the order of calls intact

0

There are 0 answers