00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <algorithm>
00027 #include <cerrno>
00028 #include <cstring>
00029 #include <exception>
00030 #include <functional>
00031 #include <fcntl.h>
00032 #include <unistd.h>
00033
00034 #include <scalestack/event/libevent/handler_provider.h>
00035 #include <scalestack/event/libevent/thread.h>
00036
00037 using namespace std;
00038
00039 namespace scalestack
00040 {
00041 namespace event
00042 {
00043 namespace libevent
00044 {
00045
00046
00047
00048
00049
00050 extern "C" void thread_handle_notify(int file_descriptor, short events,
00051 void* context)
00052 {
00053 thread* thread = static_cast<libevent::thread*>(context);
00054 thread->_handle_notify(file_descriptor, events);
00055 }
00056
00057
00058
00059
00060
00061 thread::thread(kernel::module& module, size_t thread_index):
00062 threading::thread(module),
00063 _shutdown(),
00064 _handler_providers_index(),
00065 _thread_index(thread_index),
00066 _base(),
00067 _notify_pipe(),
00068 _notify_event(),
00069 _notify_timeout(),
00070 _now(),
00071 _handler_providers_mutex(module),
00072 _handler_providers()
00073 {
00074 if (gettimeofday(&_now, 0) == -1)
00075 _module.log_fatal(_("Failed gettimeofday: %s:%d"), strerror(errno), errno);
00076
00077 _base = event_base_new();
00078 if (_base == NULL)
00079 _module.log_fatal(_("Failed event_base_new: NULL"));
00080
00081 _module.log_debug(_("[Thread %u] Using libevent %s with %s"), _thread_index,
00082 event_get_version(), event_base_get_method(_base));
00083
00084 try
00085 {
00086 _add_notify_pipe();
00087 try
00088 {
00089 start();
00090 }
00091 catch (exception&)
00092 {
00093 _remove_notify_pipe();
00094 throw;
00095 }
00096 }
00097 catch (exception&)
00098 {
00099 event_base_free(_base);
00100 throw;
00101 }
00102
00103 _module.log_info(_("[Thread %u] Constructed"), _thread_index);
00104 }
00105
00106 thread::~thread()
00107 {
00108 stop();
00109 _remove_notify_pipe();
00110 event_base_free(_base);
00111 _module.log_info(_("[Thread %u] Destructed"), _thread_index);
00112 }
00113
00114 void thread::shutdown(void)
00115 {
00116 _module.log_info(_("[Thread %u] Shutting down"), _thread_index);
00117 _shutdown = true;
00118 _send_notify();
00119 }
00120
00121 void thread::check_handler_provider(handler_provider* handler_provider)
00122 {
00123 _handler_providers_mutex.lock();
00124
00125
00126 if (handler_provider->add_to_check_queue())
00127 _handler_providers[_handler_providers_index].push_back(handler_provider);
00128
00129 _handler_providers_mutex.unlock();
00130 _send_notify();
00131 }
00132
00133 struct event_base* thread::get_event_base(void)
00134 {
00135 return _base;
00136 }
00137
00138 const struct timeval& thread::get_now(void)
00139 {
00140 if (gettimeofday(&_now, 0) == -1)
00141 _module.log_error(_("Failed gettimeofday: %s:%d"), strerror(errno), errno);
00142 return _now;
00143 }
00144
00145
00146
00147
00148
00149 void thread::run(void)
00150 {
00151 _module.log_info(_("[Thread %u] Entering event Loop"), _thread_index);
00152
00153 if (event_base_loop(_base, 0) == -1)
00154 _module.log_fatal(_("[Thread %u] event_base_loop: -1"), _thread_index);
00155
00156 _module.log_info(_("[Thread %u] Leaving event loop"), _thread_index);
00157 }
00158
00159 void thread::_add_notify_pipe(void)
00160 {
00161 int return_code = pipe(_notify_pipe);
00162 if (return_code == -1)
00163 _module.log_fatal(_("Failed pipe: %s:%d"), strerror(errno), errno);
00164
00165 for (int index = 0; index < 2; index++)
00166 {
00167 return_code = fcntl(_notify_pipe[index], F_GETFL, 0);
00168 if (return_code == -1)
00169 {
00170 _remove_notify_pipe();
00171 _module.log_fatal(_("Failed fcntl: %s:%d"), strerror(errno), errno);
00172 }
00173
00174 return_code = fcntl(_notify_pipe[index], F_SETFL, return_code | O_NONBLOCK);
00175 if (return_code == -1)
00176 {
00177 _remove_notify_pipe();
00178 _module.log_fatal(_("Failed fcntl: %s:%d"), strerror(errno), errno);
00179 }
00180 }
00181
00182 event_set(&_notify_event, _notify_pipe[0], EV_READ | EV_PERSIST,
00183 thread_handle_notify, this);
00184
00185 if (event_base_set(_base, &_notify_event) == -1)
00186 {
00187 _remove_notify_pipe();
00188 _module.log_fatal(_("Failed event_base_set: -1"));
00189 }
00190
00191 _notify_timeout.tv_sec = 1;
00192 _notify_timeout.tv_usec = 0;
00193 if (event_add(&_notify_event, &_notify_timeout) == -1)
00194 {
00195 _remove_notify_pipe();
00196 _module.log_fatal(_("Failed event_add: -1"));
00197 }
00198 }
00199
00200 void thread::_remove_notify_pipe(void)
00201 {
00202 close(_notify_pipe[0]);
00203 close(_notify_pipe[1]);
00204 }
00205
00206 void thread::_send_notify(void)
00207 {
00208 if (write(_notify_pipe[1], "\0", 1) == -1 && errno != EINTR)
00209 _module.log_error(_("Failed pipe write: %s:%d"), strerror(errno), errno);
00210 }
00211
00212 void thread::_handle_notify(int file_descriptor, short events)
00213 {
00214 _module.log_debug(_("[Thread %u] Notified"), _thread_index);
00215
00216 if (events & EV_READ)
00217 {
00218 char buffer[1024];
00219 if (read(file_descriptor, buffer, sizeof(buffer)) == -1 && errno != EINTR)
00220 _module.log_error(_("Failed pipe read: %s:%d"), strerror(errno), errno);
00221 }
00222
00223 if (events & EV_TIMEOUT)
00224 {
00225
00226 if (event_add(&_notify_event, &_notify_timeout) == -1)
00227 {
00228
00229 _module.log_fatal(_("Failed event_add: -1"));
00230 }
00231 }
00232
00233 if (_shutdown)
00234 {
00235
00236 if (event_del(&_notify_event) == -1)
00237 {
00238
00239 _module.log_error(_("Failed event_del: -1"));
00240 }
00241 }
00242
00243
00244 uint8_t index = _handler_providers_index;
00245
00246 _handler_providers_mutex.lock();
00247 _handler_providers_index = static_cast<uint8_t>(1 - _handler_providers_index);
00248 _handler_providers_mutex.unlock();
00249
00250 for_each(_handler_providers[index].begin(), _handler_providers[index].end(),
00251 mem_fun(&handler_provider::check));
00252
00253 _handler_providers[index].clear();
00254 }
00255
00256 }
00257 }
00258 }