From: Miah Petersen on 25 Mar 2010 13:56 I needed an email queueing system for a project I am writing. The script below runs every minute via a cronjob on one of my servers. I am using a table in mysql to keep track of the number of email bot threads to spawn and their current state. They need to be threaded(or at least run concurrently) since some email lists will be longer than others, and some emails have a higher priority. The thought is that this way I can quickly add more threads as is needed, as well as have a simple web ui to check their state and turn them on or off as is necessary. It works perfectly, most of the time. But sometime, arbitrarily, one of the threads does not finish and an email bot state is left as processing. In the log file, it actually says that it completes, but the entry in the database still says processing. Any ideas? ======== Email Bots Table =========== +----+--------+----------------------------------------------+----------------+---------------------+ | id | status | query_constraints | state | updated_at | +----+--------+----------------------------------------------+----------------+---------------------+ | 1 | on | select * from email_queue where priority = 0 | idle | 2010-03-25 17:33:14 | | 2 | on | select * from email_queue WHERE priority > 0 | idle | 2010-03-25 17:33:14 | +----+--------+----------------------------------------------+----------------+---------------------+ ======== Email bot Script that runs via cronjob =========== require 'rubygems' require 'net/smtp' require 'timeout' require 'mysql' require 'activesupport' require 'activerecord' begin require 'openssl' rescue LoadError end ActiveRecord::Base.establish_connection( :adapter => 'mysql', :host => 'localhost', :database => 'adevelopment', :username => 'username', :password => 'password' ) def send_email(from, to, subject, message) msg = <<END_OF_MESSAGE From: #{from} To: #{to} MIME-Version: 1.0 Content-type: text/html Subject: #{subject} #{message} END_OF_MESSAGE Net::SMTP.start('server', 25, '<sending domain>','username', 'password', :login) do |smtp| smtp.send_message msg, from.gsub(/[^<]+</,'').gsub(/>/,''), to.gsub(/[^<]+</,'').gsub(/>/,'') end end # gather email bots mysql = ActiveRecord::Base.connection.execute("SELECT * FROM email_bots") emailbots = {} mysql.each_hash do |p| emailbots[p['id']] = p end mysql.free threads = [] emailbots.each do |emailbot| if emailbot[1]['status'] == 'on' && emailbot[1]['state'] == 'idle' threads << Thread.new{ ebot = emailbot[1] ActiveRecord::Base.connection.execute("UPDATE email_bots SET state='processing', updated_at=UTC_TIMESTAMP() WHERE id=#{ebot['id']}") sent = "" mysql = ActiveRecord::Base.connection.execute(ebot['query_constraints']) i = 1 mysql.each_hash do |email| puts email['id'] begin send_email(email['sender'], email['recipient'], email['subject'], email['message']) sent += email['id'] + "," rescue Exception => e ActiveRecord::Base.connection.execute(ActiveRecord::Base.send("sanitize_sql_array",["UPDATE email_queue SET error='%s' WHERE id=#{email['id']}", "#{e.message} #{e.backtrace.inspect}"])) end i = i + 1 if i % 100 == 0 if sent != "" ActiveRecord::Base.connection.execute("INSERT INTO email_sent (SELECT priority, user_id, #{ebot['id']}, sender, recipient, subject, message, error, UTC_TIMESTAMP FROM email_queue WHERE id in (#{sent.chop!}))") ActiveRecord::Base.connection.execute("DELETE FROM email_queue WHERE id in (#{sent})") sent = "" end end end mysql.free if sent != "" ActiveRecord::Base.connection.execute("INSERT INTO email_sent (SELECT priority, user_id, #{ebot['id']}, sender, recipient, subject, message, error, UTC_TIMESTAMP FROM email_queue WHERE id in (#{sent.chop!}))") ActiveRecord::Base.connection.execute("DELETE FROM email_queue WHERE id in (#{sent})") end ActiveRecord::Base.connection.execute("UPDATE email_bots SET state='idle' WHERE id=#{ebot['id']}") } else # This is a potential work around which i am not too excited about. # mysql = ActiveRecord::Base.connection.execute(ebot['query_constraints']) # if mysql.num_rows == 0 # ActiveRecord::Base.connection.execute("UPDATE email_bots SET current_action='idle' WHERE id=#{ebot['id']}") end end end threads.each { |t| t.join } -- Posted via http://www.ruby-forum.com/.
From: Miah Petersen on 25 Mar 2010 14:06 Actually looking it is terminating when i call mysql.free after finishing walking through all of the sends -- Posted via http://www.ruby-forum.com/.
|
Pages: 1 Prev: how to revise the programm to analyze web? Next: Last Call for Ruby Midwest Speakers |