Handling Concurrency, Overflow, and Periodic Draining in a Rust HashMap Collection

43 views Asked by At

I am working on a Rust project that involves a complex data structure for concurrently managing key-value pairs across multiple threads. This structure, ConcurrentStorage, is composed of a fixed-size array of Mutex<HashMap<String, i32>>, including 255 regular storage buckets and 1 dedicated bucket that serves dual purposes: it is used for overflow but can also store regular key-value pairs like any other bucket.

Key requirements and challenges include:

  1. Concurrent Inserts: The structure must support concurrent inserts from multiple threads. If the key already exists, the value is incremented by amount. Else key-value is inserted. If the total number of unique key-value pairs reaches a predefined limit (2000), subsequent new keys should still be insertable without restriction, leveraging the overflow bucket for additional storage slot as needed.

  2. Overflow and Regular Key Management: The overflow bucket, while primarily for handling excess inserts beyond the unique key limit, should not exclusively contain overflow data. It must also manage regular inserts, meaning it can store unique keys and aggregate values just like any other bucket.

  3. Periodic Draining: A periodic operation is needed to drain data from all buckets, including the overflow bucket, every (say) 30 seconds. While draining, the unique key count should be decremented, to maintain the actual count remaining.

Simplified code for demonstration:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=5ae3ac4e89d524e80fa5440f6af7cefa

use std::collections::HashMap;
use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}};
use std::thread;
use std::hash::Hasher;
use std::time::{Duration, SystemTime};


const BUCKET_COUNT: usize = 256; 
const OVERFLOW_BUCKET_INDEX: usize = BUCKET_COUNT - 1; // Overflow values added to fixed bucket 255
const UNIQUE_KEYS_LIMIT: usize = 2000; // Maximum unique key-value pairs allowed

struct ConcurrentStorage {
    buckets: Vec<Mutex<HashMap<String, i32>>>,
    unique_keys_count: AtomicUsize,
}

impl ConcurrentStorage {
    fn new() -> Self {
        let buckets = (0..BUCKET_COUNT).map(|_| Mutex::new(HashMap::new())).collect();
        Self {
            buckets,
            unique_keys_count: AtomicUsize::new(0),
        }
    }

    fn insert(&self, key: String, value: i32) {
        //println!("Unique count: {}" ,self.unique_keys_count.load(Ordering::SeqCst));
        // Check if the unique keys limit has been reached
        if self.unique_keys_count.load(Ordering::SeqCst) >= UNIQUE_KEYS_LIMIT {
            println!("Overflow added");
            // Insert into the overflow bucket without incrementing unique_keys_count
            let mut overflow_bucket = self.buckets[OVERFLOW_BUCKET_INDEX].lock().unwrap();
            *overflow_bucket.entry("Overflow_key".to_string()).or_insert(0) += value;
        } else {
            // Normal insert operation
            let bucket_index = self.calculate_bucket_index(&key);
            let mut bucket = self.buckets[bucket_index].lock().unwrap();
            
            if !bucket.contains_key(&key) && bucket_index != OVERFLOW_BUCKET_INDEX {
                self.unique_keys_count.fetch_add(1, Ordering::SeqCst);
            }
            
            *bucket.entry(key).or_insert(value) += value;
        }
    }

   fn read_and_drain(&self) {
        for i in 0..BUCKET_COUNT {
            let mut bucket = self.buckets[i].lock().unwrap();
            // Drain each key-value pair and decrement the unique_keys_count for non-overflow buckets
            for (key, value) in bucket.drain() {
                println!("Drained: {} => {}", key, value);
                if i != OVERFLOW_BUCKET_INDEX {
                    self.unique_keys_count.fetch_sub(1, Ordering::SeqCst);
                }
            }
        }
    }

    fn calculate_bucket_index(&self, key: &String) -> usize {
        let mut hasher = std::collections::hash_map::DefaultHasher::new();
        std::hash::Hash::hash(&key, &mut hasher);
        let hash = hasher.finish();
        hash as usize % (BUCKET_COUNT)
    }
}

fn main() {
    let storage = Arc::new(ConcurrentStorage::new());

    // Spawn threads for continuous insert operations
    for _ in 0..10 { // Example: create 10 threads
        let storage_clone = Arc::clone(&storage);
        thread::spawn(move || {
            loop {
                let now = SystemTime::now();
                let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH)
                    .expect("Time went backwards");
                let in_nanoseconds = since_the_epoch.as_nanos(); // Get current time in nanoseconds
                
                let key_index = (in_nanoseconds % 5000) as usize; // Generate a "random" key index
                let key = format!("key{}", key_index);
                let value = ((in_nanoseconds % 10) + 1) as i32; // Generate a "random" value between 1 and 10

                //println!("Insert key: {}", key);

                storage_clone.insert(key, value);
                thread::sleep(Duration::from_millis(100));
            }
        });
    }

    // Thread for periodic read_and_drain every 5 seconds
    let storage_clone_for_drain = Arc::clone(&storage);
    thread::spawn(move || {
        loop {
            thread::sleep(Duration::from_secs(30));
            storage_clone_for_drain.read_and_drain();
        }
    });

    loop {
        thread::sleep(Duration::from_secs(60)); // Keep the main thread alive
        println!("Main thread still alive...");
    }
}

The above code have issues in concurrent scenarios, particularly I need to ensure that

  • How can I ensure the integrity and accuracy of the unique keys count, particularly when concurrent operations and periodic draining could introduce race conditions or counting inaccuracies?
  • Are there any recommended patterns or Rust-specific features that could optimize the performance and reliability of this concurrent, overflow-capable hash map collection?

Note - I don't want to use the external rust crates providing read/write hashmap concurrency, but use the design I have (for various reasons).

0

There are 0 answers