How can I quickly distribute files equally across multiple partitions / servers?

712 views Asked by At

I am writing a method to copy files equally across multiple partitions for processing. What I'm currently doing seems to be working fine, but I feel as though there may be better ways to do it.

I have the following questions about this process:

  1. Are there any methods better than the one I'm using (that you know about), to evenly distribute files across partitions by size?

  2. Would it benefit me to implement multithreading for the file copy since I am copying to multiple partitions across multiple servers, or will I still be limited by the output on the disk transferring these files?

My method for creating equal groupings of files is as follows:

    /// <summary>
    /// Distributes a list of files into groups based on their size.
    /// </summary>
    /// <param name="files">The list of files to distribute.</param>
    /// <param name="partitionCount">The number of partitions to distribute across.</param>
    /// <returns>A balanced array of file lists for each partition.</returns>
    public List<SourceFile>[] Distribute(List<SourceFile> files, int partitionCount)
    {
        long totalSize = files.Sum(sf => sf.Size);

        long groupGoal = totalSize / partitionCount;

        List<SourceFile> sourceFilesSorted = files.OrderByDescending(sf => sf.Size).ToList();
        List<SourceFile>[] groups = Enumerable.Range(0, partitionCount).Select(l => new List<SourceFile>()).ToArray();

        int nGroup = 0, attempt = 1;
        long acceptedGoal = groupGoal;
        while (sourceFilesSorted.Count > 0)
        {
            WriteLine("Attempt {0} at initial file grouping, tolerance {1}...", attempt++, acceptedGoal);
            bool anySuccess = false;
            foreach (SourceFile sf in sourceFilesSorted.ToList())
            {
                if (groups[nGroup].Count == 0)
                {
                    groups[nGroup].Add(sf);
                    sourceFilesSorted.Remove(sf);
                    anySuccess = true;
                    continue;
                }

                bool satisfied = false;
                while (!satisfied && nGroup < groups.Length)
                {
                    if (groups[nGroup].Sum(gf => gf.Size) + sf.Size <= acceptedGoal)
                    {
                        groups[nGroup].Add(sf);
                        sourceFilesSorted.Remove(sf);
                        anySuccess = true;
                        satisfied = true;
                    }
                    if (!satisfied)
                        nGroup++;
                }

                if (++nGroup >= groups.Length)
                {
                    nGroup = 0;
                }
            }

            if (sourceFilesSorted.Count > 0)
                groups = groups.OrderBy(g => g.Sum(gf => gf.Size)).ToArray();

            if (!anySuccess)
                acceptedGoal += groupGoal;
        }

        groups = groups.OrderByDescending(g => g.Sum(gf => gf.Size)).ToArray();

        attempt = 1;
        acceptedGoal = groupGoal;

        bool hasMove = true;

        while (hasMove)
        {
            WriteLine("Attempt {0} at moving larger group files into smaller groups...", attempt);

            WriteLine("There are {0} groups above tolerance: {1}", groups.Where(g => (g.Sum(gf => gf.Size) > acceptedGoal)).Count(), acceptedGoal);

            // Begin moving files in groups where acceptable.
            List<SourceFile>[] move = Enumerable.Range(0, groups.Length).Select(l => new List<SourceFile>()).ToArray();
            for (int i = 0; i < groups.Length && i < move.Length; i++)
            {
                // WriteLine("Group {0} sum: {1}", i + 1, groups[i].Sum(sf => sf.Size));

                if (groups[i].Sum(sf => sf.Size) <= acceptedGoal)
                    continue;

                foreach (SourceFile file in groups[i])
                {
                    if (groups.Where(g => (g.Sum(gf => gf.Size) + file.Size <= acceptedGoal)).Any())
                    {
                        move[i].Add(file);
                    }
                }
            }

            long moves = move.Sum(m => m.Count);
            hasMove = move.Any(m => m.Any());
            WriteLine("Found {0} moves, {1}", moves, hasMove ? "attempting to redistribute..." : "process complete.");
            for (int i = 0; i < groups.Length; i++)
            {
                for (int j = 0; j < move.Length; j++)
                {
                    foreach (SourceFile file in move[j].ToList())
                    {
                        if (groups[i].Sum(sf => sf.Size) + file.Size <= acceptedGoal)
                        {
                            groups[i].Add(file);
                            groups[j].Remove(file);
                            move[j].Remove(file);
                        }
                    }
                }
            }

            if (!hasMove && acceptedGoal == groupGoal)
            {
                var acceptedGroups = groups.Where(g => (g.Sum(gf => gf.Size) <= acceptedGoal));
                acceptedGoal = acceptedGroups.Sum(g => g.Sum(gf => gf.Size)) / acceptedGroups.Count();
                WriteLine("Lowering tolerance to {0} for {1} groups, continue distribution...", acceptedGoal, acceptedGroups.Count());
                hasMove = true;
            }
        }

        return groups;
    }

To begin, I specify the acceptedGoal, which is the targeted size I am looking to achieve on each server. This is simply the mean of all the file sizes which would create a perfect distribution.

After this, I sort a list of files by size, descending, and I start adding them into each group, skipping over each group when the addition will make the total size greater than acceptedGoal.

Once there are no successful additions within an iteration, the acceptedGoal is increased by the initial value, which is basically just increasing the tolerance on each round. Before each iteration begins, the groups are always sorted lowest to highest to make sure the new files are being added to the current smallest group to keep the overall variance as low as possible.


Update: I went more in depth and now iterate over the list a second time. This time, it calculates a new, lower tolerance which is the mean of groups that already were below the initial accepted tolerance.

The process will keep attempting to move files out of the groups that exceed the tolerance into the groups that are below it.

So far, I have gotten this down to a very low variance outside of the target goal, but I am still not sure if there is a better way to do it.


Update 2: So thanks to @Enigmativity I was able to refactor this all again into a very clean IEnumerable method:

    /// <summary>
    /// Distributes a list of files into groups based on their size.
    /// </summary>
    /// <param name="files">The list of files to distribute.</param>
    /// <param name="partitionCount">The number of partitions to distribute across.</param>
    /// <returns>A balanced array of file lists for each partition.</returns>
    public IEnumerable<List<SourceFile>> Distribute(List<SourceFile> files, int partitionCount)
    {
        // Calculate the max fileSize tolerance per partition (the "perfect" distribution size across each disk). 
        long tolerance = files.Sum(sf => sf.Size) / partitionCount;

        List<List<SourceFile>> groups = Enumerable.Range(0, partitionCount).Select(l => new List<SourceFile>()).ToList();

        // Process each file, large to small.
        foreach(var file in files.OrderByDescending(sf => sf.Size))
        {
            // Add file to the smallest current group.
            groups.OrderBy(g => g.Sum(f => f.Size)).First().Add(file);

            // If this group exceeds tolerance, return it now so we can begin processing immediately.
            List<List<SourceFile>> returnGroups = groups.Where(g => g.Sum(sf => sf.Size) > tolerance).ToList();
            foreach (var retGroup in returnGroups)
            {
                groups.Remove(retGroup);
                yield return retGroup;
            }
        }
        // Remember to return the rest of the groups, large to small.
        foreach(var retGroup in groups.OrderByDescending(g => g.Sum(sf => sf.Size)))
            yield return retGroup;
    }

Now I plan on iterating over this list and executing a copy on each partition as the lists are created.


I am still curious if multithreading will help process quicker since this is being copied on multiple partitions across servers. Will I/O still be restricted by the other copy processes since the disk only really needs to read the data and send it to the other partition over the network. From here, the other partition will utilize write speeds on its own disk (I think), which leads me to believe multithreading is a good idea. Does this sound correct or am I way off?

If anyone has any resources to look into, that would be greatly appreciated as well. I haven't found too much on this topic on the web that I really understand.

1

There are 1 answers

3
Enigmativity On BEST ANSWER

I think this does what you want:

public List<SourceFile>[] Distribute(List<SourceFile> files, int partitionCount)
{
    List<SourceFile> sourceFilesSorted =
        files
            .OrderByDescending(sf => sf.Size)
            .ToList();

    List<SourceFile>[] groups =
        Enumerable
            .Range(0, partitionCount)
            .Select(l => new List<SourceFile>())
            .ToArray();

    foreach (var f in files)
    {
        groups
            .Select(grp => new { grp, size = grp.Sum(x => x.Size) })
            .OrderBy(grp => grp.size)
            .First()
            .grp
            .Add(f);
    }

    return groups;
}

The loop is fairly inefficient, but the results for 10,000 files come back in less than a second so I hope that's fast enough. It wouldn't be too hard to create a running group size array if need be, but that would just complicate the code if the efficiency wasn't needed.