How to modify a function that runs serially to a function that runs parallelly in python

56 views Asked by At

I'm trying to build a database of all the Isomorphically different undirected graphs up to 15 vertices.

To do that, I wrote a code in python the runs serially, checking every possible adjacency matrix (up to half of the max number of edges in a graph with n vertices). The code checks the graph against each graph with the same number of edges and if it's different than all of the others, it appends it to the list. This is the main parts of the code:

def get_all_different_graphs(n):
    # Init Database
    all_graphs = {'nodes': n, 'max_edges': int(((n - 1) * n) / 2), 'graphs_count': 0, 'graphs': {}}
    for i in range(all_graphs['max_edges'] + 1):
        all_graphs['graphs'][str(i)] = []

    table = sum_table(all_graphs['max_edges'])
    generator = number_generator(0, table[all_graphs['max_edges'] // 2][all_graphs['max_edges']],
                                 all_graphs['max_edges'])

    for number in generator:
        check_single_graph(number, all_graphs)

    print(all_graphs)

def check_single_graph(number, all_graphs):
    g1 = number_to_adjacency_matrix(number, all_graphs['nodes'])
    edges = g1.ecount()

    is_isomorphic = False
    for graph in all_graphs['graphs'][str(edges)]:
        is_isomorphic = check_isomorphism(g1, graph)
        if is_isomorphic:
            break

    if not is_isomorphic:
        append_graph_and_complement(g1, not 2 * edges == all_graphs['max_edges'], edges, all_graphs)

def append_graph_and_complement(graph, append_complement, edges, all_graphs):
    all_graphs['graphs'][str(edges)].append(graph)
    all_graphs['graphs_count'] += 1

    if append_complement:
        complement = Graph.complementer(graph, loops=False)
        all_graphs['graphs'][str(all_graphs['max_edges'] - edges)].append(complement)
        all_graphs['graphs_count'] += 1

Giving the exponential nature of the task, it's not possible to get even to 10 vertices in a feasible amount of time, and that's why I want to switch to parallel processing and split the task to smaller batches. That way I can maximize the CPU usage and maybe get there with a single computer. (The next step will be to modify the code to a distributed system)

My main issues are how to transition to parallel processing while maintaining thread safety and preventing redundant computations. I don't want to accidentally add 2 isomorphic graphs and I don't want one thread to get to the point it has to check a certain graph twice because it's computationally intensive.

I tried to write my own code that creates as many processes as there are cores the splits the task between them. I chose to create processes and not threads because from what I read they are more efficient when it comes to CPU-bound tasks such as this.

The code breaks the amount of adjacency matrices to check into batches, each in a different process, and checks the against the shared resource list just like the previous code. The main difference is the method of appending to the list, I built a compare_and_append function using a lock that tries to append constantly and if it fails, instead of checking the whole list all over again, it starts to check list from the last index it previously tried. The idea of this design is to prevent multiple checks of the same graph and to prevent process starvations.

This is the main parts of that code:

def run_parallel_processes(num_processes):
    processes = []
    with multiprocessing.Manager() as manager:
        num_of_vertices = 8
        max_edges = int(((num_of_vertices - 1) * num_of_vertices) / 2)
        all_graphs = {'nodes': num_of_vertices, 'max_edges': max_edges,
                      'num_of_graphs': multiprocessing.Value('i', 0),
                      'graphs': {}}
        table = sum_table(max_edges)

        for i in range(max_edges + 1):
            list_instance = {'lock': multiprocessing.Lock(), 'graphs': manager.list(),
                             'amount': multiprocessing.Value('i', 0)}
            all_graphs['graphs'][str(i)] = list_instance

        for i in range(num_processes):
            start_index = get_nth_number(max_edges, max_edges // 2,
                                         math.ceil(
                                             table[max_edges // 2][max_edges] * i / num_processes),
                                         table, 0)
            count = math.ceil(table[max_edges // 2][max_edges] / num_processes)
            process = multiprocessing.Process(target=generate_and_check_graphs,
                                              args=(start_index, count, max_edges, all_graphs))
            processes.append(process)
            process.start()

        for process in processes:
            process.join()
        print(all_graphs)


def generate_and_check_graphs(start_index, count, all_graphs):
    generator = number_generator(start_index, count, all_graphs['max_edges'])
    n = all_graphs['nodes']
    for number in generator:
        start = 0
        continue_to_next_number = False

        g1 = number_to_adjacency_matrix(number, n)
        edges = g1.ecount()

        while not continue_to_next_number:
            expected_len = len(all_graphs['graphs'][str(edges)]['graphs'])

            for i in range(start, expected_len):
                graph = all_graphs['graphs'][str(edges)]['graphs'][i]
                if not continue_to_next_number:
                    continue_to_next_number = check_isomorphism(g1, graph)
                    start += 1

            if not continue_to_next_number:
                continue_to_next_number = compare_and_append(all_graphs, edges, expected_len, g1)

def compare_and_append(all_graphs, edges, expected_len, new_graph):
    with ((all_graphs['graphs'][str(edges)]['lock'])):
        graphs = all_graphs['graphs'][str(edges)]
        if graphs['amount'].value == expected_len:
            graphs['graphs'].append(new_graph)

            # Update count
            graphs['amount'].value = graphs['amount'].value + 1
            all_graphs['num_of_graphs'].value = all_graphs['num_of_graphs'].value + 1

            if not 2 * edges == all_graphs['max_edges']:
                complement_graphs = all_graphs['graphs'][str(all_graphs['max_edges'] - edges)]
                complement_graphs['graphs'].append(new_graph)

                # Update count
                complement_graphs['amount'].value = complement_graphs['amount'].value + 1
                all_graphs['num_of_graphs'].value = all_graphs['num_of_graphs'].value + 1
            return True
        else:
            return False

In practice, This code sucks. I believe that the way I implemented the compare_and_append function slows it down far more than it helps. I'm not sure if it's the lock or if the all method I chose is bad but I'd love some help on how to do things in the most efficient manner. Any help would be appreciated!

1

There are 1 answers

1
safXcode On

Parallelizing a computation like this can be challenging due to the need to ensure thread safety and minimize redundant work. Your approach using multiprocessing and locks is a good start, but there are several optimizations and improvements that can be made to make it more efficient. I'll provide some suggestions below:

  1. Avoid Global Locks: The global lock around the compare_and_append function can lead to contention and limit parallelism. Instead, you can use per-edge locks to reduce contention. Create a dictionary of locks, one for each edge count.
edge_locks = {str(i): multiprocessing.Lock() for i in range(max_edges + 1)}

Then, when calling compare_and_append, acquire the specific lock for the edge count:

with edge_locks[str(edges)]:
    # Your append logic here
  1. Batching: You can further optimize your processing by batching the work. Rather than having each process work on one graph at a time, you can have each process work on a batch of graphs. This reduces the overhead of acquiring locks for each graph and can improve throughput.

  2. Use Pool for Parallelism: Instead of manually creating and managing processes, you can use the multiprocessing.Pool class, which abstracts away some of the complexity. It allows you to submit tasks to a pool of worker processes and manage them more easily.

  3. Avoid Global Data: While you need to share data between processes, minimize the amount of shared data and use shared data structures like multiprocessing.Manager().list() only when necessary. Consider using inter-process communication (IPC) for critical shared data.

Here's an example of how you can modify your code to use these suggestions:

from multiprocessing import Pool, Lock

def compare_and_append(args):
    (all_graphs, edges, expected_len, new_graph) = args
    with all_graphs['locks'][str(edges)]:
        graphs = all_graphs['graphs'][str(edges)]
        if graphs['amount'].value == expected_len:
            graphs['graphs'].append(new_graph)

            # Update count
            graphs['amount'].value = graphs['amount'].value + 1
            all_graphs['num_of_graphs'].value = all_graphs['num_of_graphs'].value + 1

            if not 2 * edges == all_graphs['max_edges']:
                complement_graphs = all_graphs['graphs'][str(all_graphs['max_edges'] - edges)]
                complement_graphs['graphs'].append(new_graph)

                # Update count
                complement_graphs['amount'].value = complement_graphs['amount'].value + 1
                all_graphs['num_of_graphs'].value = all_graphs['num_of_graphs'].value + 1
            return True
        else:
            return False

def generate_and_check_graphs(args):
    (start_index, count, all_graphs) = args
    generator = number_generator(start_index, count, all_graphs['max_edges'])
    n = all_graphs['nodes']
    
    # Initialize a pool of worker threads for isomorphism checks
    with Pool(processes=num_threads) as pool:
        for number in generator:
            start = 0
            continue_to_next_number = False
            g1 = number_to_adjacency_matrix(number, n)
            edges = g1.ecount()

            while not continue_to_next_number:
                expected_len = len(all_graphs['graphs'][str(edges)]['graphs'])
                batch_args = [(all_graphs, edges, expected_len, g1)] * num_threads
                results = pool.map(compare_and_append, batch_args)
                if True in results:
                    continue_to_next_number = True

# Create a lock for each edge count
edge_locks = {str(i): Lock() for i in range(max_edges + 1)}

# Adjust the number of threads based on your system's capabilities
num_threads = multiprocessing.cpu_count()

# Use multiprocessing.Pool for parallelism
with Pool(processes=num_processes) as pool:
    for i in range(num_processes):
        start_index = get_nth_number(max_edges, max_edges // 2, math.ceil(table[max_edges // 2][max_edges] * i / num_processes), table, 0)
        count = math.ceil(table[max_edges // 2][max_edges] / num_processes)
        args = (start_index, count, all_graphs)
        pool.apply_async(generate_and_check_graphs, (args,))

    pool.close()
    pool.join()

print(all_graphs)