handling db connection on daemonize threads

951 views Asked by At

I have a problem handling database connections in a daemon I've been working on, I first connect to my postgres database with:

try:
  psycopg2.apilevel = '2.0'
  psycopg2.threadsafety = 3
  cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
  except Exception, e:
  print "Unable to connect to DB. Error [%s]" % ( e,)
  exit( )

after that I select all rows in the DB that are with status = 0:

try:
  cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
  cursor.execute( "SELECT * FROM table WHERE status = 0")
  rows = cursor.fetchall( )
  cursor.close( )
except Exception, e:
  print "Error on sql query [%s]" % ( e,)

then if there are rows selected the program forks into:

while 1:
  try:
    psycopg2.apilevel = '2.0'
    psycopg2.threadsafety = 3
    cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
  except Exception, e:
    print "Unable to connect to DB. Error [%s]" % ( e,)
    exit( )

  if rows:
    daemonize( )
    for i in rows:
      try:
        global q, l
        q = Queue.Queue( max_threads)
        for i in rows:
          cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
          t = threading.Thread( target=sender, args=(i, cursor))
          t.setDaemon( True)
          t.start( )

          for i in rows:
            q.put( i)
            q.join( )
      except Exception, e:
        print "Se ha producido el siguente error [%s]" % ( e,)
        exit( )
  else:
    print "No rows where selected\n"
    time.sleep( 5)

My daemonize function looks like this:

def daemonize( ):
  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

  os.chdir("/")
  os.umask(0)

  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

threads target to sender function:

def sender( row, db):
  while 1:
  item = q.get( )
  if send_to( row['to'], row['text']):
    db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
  else:
    print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  db.close( )
  q.task_done( )

send_to function just opens a url and return true or false on success

Since yesterday i keep getting these error and cant find my way thru:

UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sender.py", line 30, in sender
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
    return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
1

There are 1 answers

0
Nick Craig-Wood On

Database handles don't survive across fork(). You'll need to open a new database handle in each subprocess, ie after you call daemonize() call psycopg2.connect.

I've not used postgres but I know this to be definitely true for MySQL.