Sliding Window Median Two Heaps Method in Python, TLE Error

74 views Asked by At

Im having trouble solving the Sliding Window Median problem using two heaps method as i keep encountering Time Limit Exceeded Error on the test case where K = 50000 and the input an array of 100000 ints. Is there a way to reduce the time complexity of my code?

Problem:

You are given an integer array nums and an integer k. There is a sliding window of size k which is moving from the very left of the array to the very right. You can only see the k numbers in the window. Each time the sliding window moves right by one position.

Return the median array for each window in the original array.

My Solution:

class Solution(object):

 def __init__(self):
     self.small = [] #max heap of small numbers
     self.large = [] #min heap of large numbers
     
 def balance(self):
     #if small bigger than large by more than 1, pop and push
     if len(self.small) > len(self.large) + 1:
         val = heapq.heappop(self.small) *-1
         heapq.heappush(self.large,val)
     if len(self.large) > len(self.small) + 1:
         val = heapq.heappop(self.large)
         heapq.heappush(self.small,val * -1)

 def addNum(self, num):
     heapq.heappush(self.small, -1 * num)

     #if max of small is greater than min of large, pop from small and push to large
     if (self.small and self.large and (-1 * self.small[0]) > self.large[0]):
         val = heapq.heappop(self.small) * -1
         heapq.heappush(self.large,val)
     
     self.balance()
     
 def findMedian(self):
     if len(self.small) > len(self.large):
         return self.small[0] * -1
     if len(self.large) > len(self.small):
         return self.large[0]
     
     
     result = float( ((self.small[0]*-1) + self.large[0])/2.0)
     return result
 
 def popNum(self,num):
     if num > (self.small[0] * -1):
         self.large.remove(num)
         heapq.heapify(self.large)
     else:
         self.small.remove(num * -1)
         heapq.heapify(self.small)
     
     self.balance()

 def medianSlidingWindow(self, nums, k):
     """
     :type nums: List[int]
     :type k: int
     :rtype: List[float]
     """
     
     start = 0
     end = len(nums) - k + 1
     result = []

     for i in range(start,start + k):
         self.addNum(nums[i])


     while start < end:
         
         median = self.findMedian()
         result.append(median)
         self.popNum(nums[start])
         if start < end - 1:
             self.addNum(nums[start + k])
         start += 1

     return result
1

There are 1 answers

1
trincot On

The main efficiency problem is in popNum, since there you don't profit from the heap's efficient methods. It searches through the list with remove to find an entry, removes it from the list, breaking the heap consistency, so that you need to call heapify. This gives the removal operation a O() time complexity, which will start hurting as is larger.

There are at least two solutions for this:

  • Use a heap implementation that can remove elements with O(log) complexity (where is the size of the heap). You can achieve this by maintaining a hash map (dict) that maps each value to an index in the heap list. Once you have the index you can replace that value with the last entry in the heap (shortening the heap), and apply the usual sift procedure to restore the heap property in O(log). But all this implies you need to control the heap's sifting operations, so you would need to rewrite the heapq functions.

  • Only mark the deleted elements, but leave them in the heap. When peeking/popping the root of the heap, identify whether it has the deleted mark, and if so, pop it, ignore it and repeat.

    To uniquely identify elements we can join them with the index where they occur in the input list, because there may be duplicate values. This way we have a unique tuple for each entry in the input. To mark them as deleted is easy in this particular problem: if the index is outside the current window, it should be considered deleted. That way we only have to move the window to implicitly mark an element as deleted.

I went for the second option.

To deal with the (value, index) tuples, and the deleted elements still in the heap, it probably makes sense to make a heap class -- which still can rely on heapq.

We also have to take care to not mistake the size of the heap with the real number of non-deleted items in that heap. For that I would suggest to maintain a balance, which should be updated when we insert or delete (i.e. mark as deleted) an entry in one of the two heaps.

Here is an implementation of the above idea. As I noticed you used LeetCode's template for Python 2 and not Python 3, the below has a super call with arguments, but those arguments can be omitted when running Python 3:

import heapq

class MinWindowHeap(object):
    def __init__(self, conv=lambda x: x):
        self.heap = []
        self.conv = conv  # Conversion function (to negate values for MaxHeap)
        self.lowindex = 0  # lower index limit to identify deleted items
        
    def peek(self):  # Returns a (value, index) or None if the heap is empty
        while self.heap:
            item = self.conv(self.heap[0])
            if item[1] >= self.lowindex:
                return item
            # a deleted element: skip it
            heapq.heappop(self.heap)
    
    def push(self, item):
        heapq.heappush(self.heap, self.conv(item))
    
    def pop(self):
        item = self.peek()  # Remove deleted elements       
        heapq.heappop(self.heap)
        return item


def negate(item):  # Helper function for implementing MaxHeap
    return -item[0], item[1]


class MaxWindowHeap(MinWindowHeap):
    def __init__(self):
        # In Python 3 the arguments for super() can be omitted:
        super(MaxWindowHeap, self).__init__(negate)


class Solution(object):
    def rebalance(self, add):
        self.balance += add
        if abs(self.balance) < 2:
            return
        if self.balance > 1:
            self.small.push(self.large.pop())
        elif self.balance < -1:
            self.large.push(self.small.pop())
        self.balance = 0

    def insert(self, item):
        pivot = self.large.peek() or self.small.peek()
        islarge = not pivot or item > pivot
        heap = self.large if islarge else self.small
        heap.push(item)
        self.rebalance(1 if islarge else -1)
   
    def remove(self, item):
        pivot = self.large.peek()
        islarge = pivot and item >= pivot
        # We only adapt the window in order to mark the item as deleted
        self.large.lowindex = self.small.lowindex = item[1] + 1
        self.rebalance(-1 if islarge else 1)

    def getMedian(self):
        if self.balance == 0:
            return (self.large.peek()[0] + self.small.peek()[0]) * 0.5
        return self.large.peek()[0] if self.balance > 0 else self.small.peek()[0]

    def medianSlidingWindow(self, nums, k):
        self.small = MaxWindowHeap()
        self.large = MinWindowHeap()
        self.balance = 0
        # create list of (value, index) pairs
        items = [(val, i) for i, val in enumerate(nums)]
        # Populate by first window
        for item in items[:k]:
            self.insert(item)
        result = [self.getMedian()]
        # Slide window...
        for olditem, item in zip(items, items[k:]):
            self.remove(olditem)
            self.insert(item)
            result.append(self.getMedian())

        return result