Question

I'm developing a Python daemon responsible for converting multimedia files to .mp4 format. The idea is to have the daemon running and, whenever the user requires, I add the desired video to a Queue and a thread eventually gets the video from the queue and calls Handbrake via Subprocess in order to do the conversion. For simplicity's sake, I'm using only one thread at the moment.

Here's my code. First, the daemon (server.py, adapted from Kris Johnson's post here):

#!/usr/bin/env python
# -*- coding: utf-8 -*- 

import os, sys
import os.path
import logging.config
import SocketServer
import optparse
import resource
import socket, tempfile
import time
from threadedqueue import QueueServer

version = '0.1'
SERVER_PORT=6666
SERVER_SOCKET='server_socket'

SERVER_TYPE=SocketServer.UnixStreamServer
ServerBase = SERVER_TYPE

if ServerBase == SocketServer.UnixStreamServer:
    server_address = os.path.join(tempfile.gettempdir(), SERVER_SOCKET)

SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)


class RequestHandler(SocketServer.StreamRequestHandler):

    """Request handler

    An instance of this class is created for each connection made
    by a client.  The Server class invokes the instance's
    setup(), handle(), and finish() methods.

    The template implementation here simply reads a single line from
    the client, breaks that up into whitespace-delimited words, and
    then uses the first word as the name of a "command."  If there is
    a method called "do_COMMAND", where COMMAND matches the
    commmand name, then that method is invoked.  Otherwise, an error
    message is returned to the client.

    """

    def handle(self):
        """Service a newly connected client.

        The socket can be accessed as 'self.connection'.  'self.rfile'
        can be used to read from the socket using a file interface,
        and 'self.wfile' can be used to write to the socket using a
        file interface.

        When this method returns, the connection will be closed.
        """

        # Read a single request from the input stream and process it.
        request = self.rfile.readline()
        if request:
            self.server.log('request %s: %s',
                            self.connection.getpeername(), request.rstrip())
            try:
                self.process_request(request)
            except Exception, e:
                self.server.log('exception: %s' % str(e))
                self.wfile.write('Error: %s\n' % str(e))
        else:
            self.server.log('error: unable to read request')
            self.wfile.write('Error: unable to read request')


    def process_request(self, request):
        """Process a request.

        This method is called by self.handle() for each request it
        reads from the input stream.

        This implementation simply breaks the request string into
        words, and searches for a method named 'do_COMMAND',
        where COMMAND is the first word.  If found, that method is
        invoked and remaining words are passed as arguments.
        Otherwise, an error is returned to the client.
        """

        words = request.split()
        if len(words) == 0:
            self.server.log('error: empty request')
            self.wfile.write('Error: empty request\n')
            return

        command = words[0]
        args = words[1:]

        methodname = 'do_' + command
        if not hasattr(self, methodname):
            self.server.log('error: invalid command')
            self.wfile.write('Error: "%s" is not a valid command\n' % command)
            return
        method = getattr(self, methodname)
        method(*args)


    def do_stop(self, *args):
        self.wfile.write('Stopping server\n')
        self.server.stop()


    """Process an 'echo' command"""
    def do_echo(self, *args):
        self.wfile.write(' '.join(args) + '\n')


    """Process a 'convert' command"""
    def do_convert(self, video):
        self.wfile.write('Converting %s\n' % (video))

        try:
            self.server.addVideo(video)
        except Exception as e:
            logging.info("ERROR: %s" % e)


class Server(ServerBase):

    def __init__(self, server_address):
        self.__daemonize()
        self.pool = QueueServer()

        if ServerBase == SocketServer.UnixStreamServer:

            # Delete the socket file if it already exists
            if os.access(server_address, 0):
                os.remove(server_address)

        ServerBase.__init__(self, server_address, RequestHandler)


    def addVideo(self, video):
        self.pool.add(video)

    def log(self, format, *args):
        try:
            message = format % args
            logging.info("%s" % message)
        except Exception, e:
            print str(e)


    def serve_until_stopped(self):
        self.log('started')

        self.__stopped = False

        while not self.__stopped:
            self.handle_request()

        self.log('stopped')


    def stop(self):
        self.__stopped = True


    def __daemonize(self):
        UMASK = 0
        WORKDIR = '/'
        MAXFD = 1024
        if hasattr(os, 'devnull'):
            REDIRECT_TO = os.devnull
        else:
            REDIRECT_TO = '/dev/null'

        try :
            if os.fork() != 0:
                os._exit(0)

            os.setsid()

            if os.fork() != 0:
                os._exit(0)

            os.chdir(WORKDIR)
            os.umask(UMASK)
        except OSError, e:
            self.log('exception: %s %s', e.strerror, e.errno)
            raise Exception, "%s [%d]" % (e.strerror, e.errno)
        except Exception, e:
            self.log('exception: %s', str(e))

        maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
        if maxfd == resource.RLIM_INFINITY:
            maxfd = MAXFD
        for fd in range(0, maxfd):
            try:
                os.close(fd)
            except OSError:
                pass

        os.open(REDIRECT_TO, os.O_RDWR)
        os.dup2(0, 1)
        os.dup2(0, 2)


""" Run a server as a daemon """
def run_server(options, args):

    print("convertCentral running on %s" % server_address)
    svr = Server(server_address)
    svr.serve_until_stopped()
    svr.server_close()


"""Send request to the server and process response."""
def do_request(options, args):  
    if ServerBase == SocketServer.UnixStreamServer:
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    # Send request
    s.connect(server_address)
    s.sendall(' '.join(args) + '\n')

    # Print response
    sfile = s.makefile('rb')
    line = sfile.readline()
    while line:
        print line,
        line = sfile.readline()


#######################################################################
#######################################################################

if __name__ == '__main__':

    optparser = optparse.OptionParser(usage=usage,
                                      version=version)
    (options, args) = optparser.parse_args()

    if len(args) == 0:
        optparser.print_help()
        sys.exit(-1)

    if args[0] == 'start':
        run_server(options, args[1:])
    else:
        do_request(options, args)

Then, the queue (threadedqueue.py - sorry about the name, not feeling particularly creative):

#! /usr/bin/python
# -*- coding: utf-8 -*- 
import os, time, threading, psutil, resource, logging, subprocess as sp, sys
from Queue import Queue
from threading import Thread

SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)

class QueueServer(object):

    current_video_queue = Queue(maxsize=0)
    N_WORKER_THREADS = 1
    counter = 0


    def __init__(self):
        logging.info("[QueueServer] Initializing the video conversion queue")
        for i in range(self.N_WORKER_THREADS):
            logging.info("Firing thread")
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    ''' Converts the video using Handbrake via subprocess'''
    def convertVideo(self, video):
        logging.info("Now converting %s" % video)
        fileName, fileExtension = os.path.splitext(video)
        payload = "nice -n 15 HandBrakeCLI -i %s -e x264 -q 15 -o %s.mp4" % (video, fileName)

        pr = sp.Popen(payload, shell=True, stdout=open('/dev/null', 'w'), stderr=sp.STDOUT)
        logging.info('Started handbrake')
        pr.wait()
        logging.info("EXIT CODE: %s " % pr.returncode)

        self.counter = self.counter + 1
        logging.info("Conversion's done. %d" % self.counter)



    ''' A worker thread '''
    def worker(self):
        while True:
            logging.info("Getting one")
            item = self.current_video_queue.get()
            logging.info("Firing conversion: %s" % item)
            self.convertVideo(item)
            self.current_video_queue.task_done()
            logging.info("All done")



    ''' Adds a video to the video conversion queue '''
    def add(self, video):
        logging.info("* Adding %s  to the queue" % video)
        self.current_video_queue.put(video)
        logging.info("* Added %s  to the queue" % video)
        time.sleep(3)

Here's the deal: if I run the threadedqueue on its own, it works great. However, if I run it using server.py, the conversion never happens because Handbrake crashes.

Here are the logs:

Hal@ubuntu:~/Desktop/convertCentral$ python server.py start
convertCentral running on /tmp/server_socket

Hal@ubuntu:~/Desktop/convertCentral$ python server.py convert UNKNOWN_PARAMETER_VALUE.WMV 

[2014-04-17 18:05:44.793] request : convert UNKNOWN_PARAMETER_VALUE.WMV
Converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.793] * Adding UNKNOWN_PARAMETER_VALUE.WMV  to the queue
[2014-04-17 18:05:44.793] * Added UNKNOWN_PARAMETER_VALUE.WMV  to the queue
[2014-04-17 18:05:44.793] Firing conversion: UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.794] Now converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.796] Started handbrake
[2014-04-17 18:05:45.046] Exit code: 0 
[2014-04-17 18:05:45.046] Conversion's done. 1
[2014-04-17 18:05:45.046] All done
[2014-04-17 18:05:45.047] Getting one

I logged the subprocess's output to a file. Here's what I got:

[18:05:44] hb_init: starting libhb thread
HandBrake 0.9.9 (2013051800) - Linux x86_64 - http://handbrake.fr
4 CPUs detected
Opening UNKNOWN_PARAMETER_VALUE.WMV...
[18:05:44] hb_scan: path=UNKNOWN_PARAMETER_VALUE.WMV, title_index=1
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/index.bdmv
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/BACKUP/index.bdmv
libbluray/bluray.c:1725: nav_get_title_list(UNKNOWN_PARAMETER_VALUE.WMV) failed (0x7fd44c000900)
[18:05:44] bd: not a bd - trying as a stream/file instead
libdvdnav: Using dvdnav version 4.1.3
libdvdread: Encrypted DVD support unavailable.
libdvdread: Can't stat UNKNOWN_PARAMETER_VALUE.WMV
No such file or directory
libdvdnav: vm: failed to open/read the DVD
[18:05:44] dvd: not a dvd - trying as a stream/file instead
[18:05:44] hb_stream_open: open UNKNOWN_PARAMETER_VALUE.WMV failed
[18:05:44] scan: unrecognized file type
[18:05:44] libhb: scan thread found 0 valid title(s)
No title found.
HandBrake has exited.

So, we can attest that the script can indeed fire up Handbrake, but the logs indicate that the Handbrake won't recognize the file format and dies on the spot. Again, this doesn't happen if I run the threadedqueue.py script on its own.

I'm guessing that Handbrake is not loading its libraries somehow. Is this the reason the code won't work? How can I get Handbrake to work?

Was it helpful?

Solution

(darned browser lost my previous answer, hope this does not become a dup)

Both of these errors Can't stat UNKNOWN_PARAMETER_VALUE.WMV and open UNKNOWN_PARAMETER_VALUE.WMV failed suggest file-not-found errors, BUT it does look like the file name is getting passed through to the handbrake command line.

So in order to be sure that the right files are being used: In the python code convert whatever filenames the user provides to server.py into absolute path names (see os.path.*). Use those absolute path names everywhere (esp. in the handbrake command/Popen)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top