How to send HTTP chunked response to emulate a video stream using Proxygen and Folly?

1.2k views Asked by At

I am writing a HTTP video streaming server based on Facebooks Proxygen. There is no seeking planned. Using the proxygen::ResponseBuilder I am abled to send chunks of a webm encoded video as HTTP response, i.e. chunked transfer encoding is working. My problem is, that Proxygen waits for proxygen::ResponseBuilder::sendWithEOM() before it even sends the response headers. I would like it to actually send the data asap after each call to proxygen::ResponseBuilder::send().

I tried to run the ResponseBuilder calls from a lambda executed from the EventBaseThread using evb->runInLoop() and evb->runInEventBaseThread()

using namespace folly;
using namespace proxygen;

std::thread t([&](){
    EventBase* evb = EventBaseManager::get()->getExistingEventBase();    
    // send headers ...
    while ( chunks avail. ) {
        //...
        evb->runInLoop([&](){
            ResponseBuilder(downstream_)
                     .body(std::move(chunk))
                     .send();
        });
        //... 
    }
    // sendWithEOM ...
});
t.detach();

This code is called from the onRequest() method of my RequestHandler. I tried to call ResponseBuilder::send() without wrapping it into evb->runInLoop(), but Proxygen v0.25.0 with Folly v0.42.0 is prohibiting calls to ResponseBuilder::send() from another thread using an assert. I removed this assert from here: https://github.com/facebook/folly/blob/v0.42.0/folly/io/async/EventBase.cpp#L491.

Now the emulated streaming is working, but it's crashing, if there are parallel requests. I guess it was not meant to be used like this, that's what the assert is for. But maybe anyone knows how to use the Proxygen infrastructure properly for my usecase?

2

There are 2 answers

1
JohnM On BEST ANSWER

That's got the same issue. I got it working with something like this.

folly::EventBase* eventBase = folly::EventBaseManager::get()->getExistingEventBase();
thread t([&, eventBase]() {
    while( chunks exist ) {
        auto chunk = getChunk();    
        eventBase->runInEventBaseThread([&, chunk=chunk]() mutable {
            ResponseBuilder(downstream_).body(move(chunk)).send();
        });
    }
});
// sendWithEOM ...
t.detach();
0
Dpak Malla On
using namespace folly;
using namespace proxygen;

//Get Event Base in worker thread and pass it to new thread. If you call this inside new thread then you won't get worker thread's event base.
EventBase* evb = EventBaseManager::get()->getExistingEventBase();   
std::thread t([&, evb](){

// send headers ...
while ( chunks avail. ) {
    //...
    evb->runInLoop([&](){
        ResponseBuilder(downstream_)
                 .body(std::move(chunk))
                 .send();
    });
    //... 
}
// sendWithEOM ...
});
t.detach();

So no need to comment assertion in EventBase source.