NodeJS socketio race conditions

59 views Asked by At

I'm caching with socket io and I need to update my values ​​very quickly. (20 simultaneous requests), and at the end of it all, a get request to check, however, my get returns the value before the updates finish, that is, I expect a result of 20, and my get returns 10, 11, etc. I put a timeout in the catch function and it worked, but it is not very performant.

I've already tried using mutex, semaphore and lock, and it didn't work. Can anyone help?

heres the code:

const { Server } = require("socket.io");
const io = new Server({});

var cache = {};

io.on('connection', (socket) => {
    socket.on('clear', () => {
        cache = {};
    });


    socket.on('addToQueue', async (data, cb) => {
        if (data.type == `get`) {
            // function that await a time
            await new Promise(resolve => setTimeout(() => resolve(true), 20))
            cb({ value: cache[data.key] })
        }

        if (data.type == `transaction`) {
            //performs a calculation on each request and edits the cache
           var { balance, last_transactions } = cache[data.key];
            var {value, type, description} = data.value;

            if (type == 'd') {
                if (balance.limit < ((balace.total - value) * -1)) {
                    return cb({ value: {error: {type: tipo, error: `no-limit`}, ...cache[data.key]} })
                }
                balance.total -= value;
            }

            if (type == 'c') {
                balance.total += value;
            };

            let latest_transactions = last_transactions|| [];
        
            latest_transactions.push({
                value: value,
                type: type,
                description: description,
                created_at: new Date().toISOString(),
            })
        
            latest_transactions.sort((a, b) => new Date(b.created_at) - new Date(a.created_at));
            latest_transactions = latest_transactions.slice(0, 10);

            cache[data.key] = {
                balance,
                last_transactions: latest_transactions
            };

            cb({ value: cache[data.key] })
        }
    });
});

(async () => {
    io.listen(6456);
    console.log(`Server listening at *:${PORT}`);
})()

Avoid race conditions without making an unnecessary timeout every time.

1

There are 1 answers

4
Lance On

To handle the issue of race conditions and ensure that your get request returns only after all updates are finished, you can implement a more robust synchronization mechanism. Given that you've tried mutexes, semaphores, and locks without success, it seems the problem might be in the way these tools were applied rather than their suitability for the task.

One effective approach is to use promises and a counter to track the completion of all transaction operations before allowing the get request to proceed. This method avoids arbitrary timeouts, which can be both inefficient and unreliable.

Here's a revised version of your code that implements a basic promise-based synchronization mechanism:

const { Server } = require("socket.io");
const io = new Server({});
var cache = {};

// Counter to keep track of ongoing transactions
let ongoingTransactions = 0;
// Array to store resolve functions of promises waiting for transactions to complete
let waitingForCompletion = [];

io.on('connection', (socket) => {
    socket.on('clear', () => {
        cache = {};
        // Reset the counter and clear waiting list without resolving, as it's a reset operation
        ongoingTransactions = 0;
        waitingForCompletion = [];
    });

    socket.on('addToQueue', async (data, cb) => {
        if (data.type === 'get') {
            if (ongoingTransactions > 0) {
                // Wait for all transactions to complete
                await new Promise(resolve => waitingForCompletion.push(resolve));
            }
            cb({ value: cache[data.key] });
        } else if (data.type === 'transaction') {
            // Increment the counter at the start of a transaction
            ongoingTransactions++;
            try {
                // Perform the transaction...
                // This is where you'd modify the cache as per your transaction logic
                // For demonstration, let's simulate it with a timeout
                await new Promise(resolve => setTimeout(resolve, 20)); // Simulate async work

                // Decrement the counter when the transaction is complete
                ongoingTransactions--;
                if (ongoingTransactions === 0) {
                    // If there are no more ongoing transactions, resolve all waiting promises
                    waitingForCompletion.forEach(resolve => resolve());
                    waitingForCompletion = []; // Clear the waiting list
                }
            } catch (error) {
                // Handle any errors that occur during the transaction
                console.error("Error during transaction:", error);
                ongoingTransactions--;
                if (ongoingTransactions === 0) {
                    waitingForCompletion.forEach(resolve => resolve());
                    waitingForCompletion = [];
                }
            }
        }
    });
});

(async () => {
    const PORT = 6456;
    io.listen(PORT);
    console.log(`Server listening at *:${PORT}`);
})();

This solution uses a counter to track the number of ongoing transactions. When a get request is received, it checks if any transactions are ongoing. If there are, it waits by creating a new promise and adding its resolve function to an array. Once a transaction completes, it decrements the counter, and if no more transactions are ongoing, it resolves all stored promises, allowing the get requests to proceed and fetch the updated cache value.

This approach ensures that get requests will only proceed once all transactions have completed, without relying on arbitrary timeouts, thus improving both performance and reliability.