00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <cerrno>
00027 #include <cstring>
00028 #include <exception>
00029 #include <fcntl.h>
00030 #include <unistd.h>
00031
00032 #include <scalestack/event/libevent/handler_provider.h>
00033 #include <scalestack/event/libevent/thread.h>
00034 #include <scalestack/kernel/module.h>
00035
00036 using namespace std;
00037
00038 namespace scalestack
00039 {
00040 namespace event
00041 {
00042 namespace libevent
00043 {
00044
00045
00046
00047
00048
00049 extern "C" void handler_provider_callback(int file_descriptor,
00050 short events,
00051 void* context)
00052 {
00053 handler_provider* handler_provider =
00054 static_cast<libevent::handler_provider*>(context);
00055 handler_provider->_callback(file_descriptor, events);
00056 }
00057
00058
00059
00060
00061
00062 handler_provider::handler_provider(kernel::module& module,
00063 handler* handler,
00064 thread& thread):
00065 event::handler_provider(module, handler),
00066 _in_check_queue(),
00067 _need_start(true),
00068 _need_run(),
00069 _need_stop(),
00070 _is_processing(),
00071 _is_set(),
00072 _is_added(),
00073 _watch_read(),
00074 _set_watch_read(),
00075 _watch_write(),
00076 _set_watch_write(),
00077 _file_descriptor(-1),
00078 _set_file_descriptor(-1),
00079 _thread(thread),
00080 _expire_time(),
00081 _set_expire_time(),
00082 _event(),
00083 _timer()
00084 {
00085 }
00086
00087 handler_provider::~handler_provider()
00088 {
00089 if (_file_descriptor != -1)
00090 _handler->close_file_descriptor(_file_descriptor);
00091 }
00092
00093 void handler_provider::start(void)
00094 {
00095 _module.log_info(_("[%X] Start"), _handler);
00096 _thread.check_handler_provider(this);
00097 }
00098
00099 void handler_provider::stop(void)
00100 {
00101 _module.log_info(_("[%X] Stop"), _handler);
00102 _need_stop = true;
00103 if (!_in_check_queue)
00104 _thread.check_handler_provider(this);
00105 }
00106
00107 void handler_provider::set_timer(uint64_t milliseconds)
00108 {
00109 _module.log_debug(_("[%X] Set timer %llu"), _handler, milliseconds);
00110 if (milliseconds == 0)
00111 _expire_time = 0;
00112 else
00113 {
00114 const struct timeval& tv = _thread.get_now();
00115 _expire_time = (tv.tv_sec * 1000) + (tv.tv_usec / 1000) + milliseconds;
00116 }
00117
00118 _notify_thread();
00119 }
00120
00121 void handler_provider::set_file_descriptor(int file_descriptor)
00122 {
00123 _module.log_info(_("[%X] Set file descriptor %d"), _handler, file_descriptor);
00124
00125 if (_file_descriptor != -1 && _file_descriptor != _set_file_descriptor)
00126 _handler->close_file_descriptor(_file_descriptor);
00127
00128 _file_descriptor = file_descriptor;
00129
00130 if (_file_descriptor != -1)
00131 {
00132 int return_code = fcntl(_file_descriptor, F_GETFL, 0);
00133 if (return_code == -1)
00134 {
00135 _module.log_error(_("[%X] Failed fcntl: %s:%d"), _handler,
00136 strerror(errno), errno);
00137 _handler->shutdown();
00138 return;
00139 }
00140
00141 return_code = fcntl(_file_descriptor, F_SETFL, return_code | O_NONBLOCK);
00142 if (return_code == -1)
00143 {
00144 _module.log_error(_("[%X] Failed fcntl: %s:%d"), _handler,
00145 strerror(errno), errno);
00146 _handler->shutdown();
00147 return;
00148 }
00149 }
00150
00151 _notify_thread();
00152 }
00153
00154 void handler_provider::watch_read(void)
00155 {
00156 _module.log_debug(_("[%X] Watch read"), _handler);
00157 _watch_read = true;
00158 _notify_thread();
00159 }
00160
00161 void handler_provider::watch_write(void)
00162 {
00163 _module.log_debug(_("[%X] Watch write"), _handler);
00164 _watch_write = true;
00165 _notify_thread();
00166 }
00167
00168 void handler_provider::run_now(void)
00169 {
00170 _module.log_debug(_("[%X] Run"), _handler);
00171 _need_run = true;
00172 if (!_in_check_queue)
00173 _thread.check_handler_provider(this);
00174 }
00175
00176 bool handler_provider::add_to_check_queue(void)
00177 {
00178 if (_in_check_queue)
00179 return false;
00180
00181 _module.log_info(_("[%X] Adding to check queue"), _handler);
00182 _in_check_queue = true;
00183 return true;
00184 }
00185
00186 void handler_provider::check(void)
00187 {
00188 _module.log_info(_("[%X] Checking"), _handler);
00189
00190 if (_need_stop)
00191 {
00192 _module.log_info(_("[%X] Stopping"), _handler);
00193 if (_is_added)
00194 {
00195 if (event_del(&_event) == -1)
00196 _module.log_error(_("[%X] Failed event_del: -1"), _handler);
00197
00198 if (_file_descriptor != -1 && _set_file_descriptor != _file_descriptor)
00199 _handler->close_file_descriptor(_file_descriptor);
00200
00201 if (_set_file_descriptor != -1)
00202 _handler->close_file_descriptor(_set_file_descriptor);
00203
00204 _file_descriptor = -1;
00205 _set_file_descriptor = -1;
00206 }
00207
00208 _stop();
00209
00210
00211
00212
00213
00214
00215 return;
00216 }
00217
00218 _in_check_queue = false;
00219 _is_processing = true;
00220
00221 if (_need_start)
00222 {
00223 _module.log_info(_("[%X] Starting"), _handler);
00224 _need_start = false;
00225 try
00226 {
00227 _handler->started();
00228 }
00229 catch (exception& e)
00230 {
00231 _module.log_error(_("[%X] Exception caught during start: %s"), _handler,
00232 e.what());
00233 _handler->shutdown();
00234 }
00235 }
00236
00237 if (_need_run && !_need_stop)
00238 {
00239 _module.log_debug(_("[%X] Running"), _handler);
00240 _need_run = false;
00241 try
00242 {
00243 _handler->run();
00244 }
00245 catch (exception& e)
00246 {
00247 _module.log_error(_("[%X] Exception caught during run: %s"), _handler,
00248 e.what());
00249 _handler->shutdown();
00250 }
00251 }
00252
00253 _is_processing = false;
00254 _update_event();
00255 }
00256
00257
00258
00259
00260
00261 void handler_provider::_notify_thread(void)
00262 {
00263
00264
00265
00266
00267
00268 if (_need_start || _is_processing)
00269 return;
00270
00271
00272 if (_event_has_changed() && !_in_check_queue)
00273 _thread.check_handler_provider(this);
00274 }
00275
00276 bool handler_provider::_event_has_changed(void)
00277 {
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291 if (_set_expire_time > _expire_time ||
00292 (_expire_time > 0 && _set_expire_time == 0) ||
00293 _set_file_descriptor != _file_descriptor ||
00294 _set_watch_read != _watch_read ||
00295 _set_watch_write != _watch_write)
00296 {
00297 return true;
00298 }
00299
00300 return false;
00301 }
00302
00303 void handler_provider::_update_event(void)
00304 {
00305 _module.log_debug(_("[%X] Updating"), _handler);
00306
00307 if (!_event_has_changed())
00308 return;
00309
00310 if (_is_added)
00311 {
00312 if (event_del(&_event) == -1)
00313 {
00314 _module.log_error(_("[%X] Failed event_del: -1"), _handler);
00315 _handler->shutdown();
00316 return;
00317 }
00318
00319 if (_set_file_descriptor != -1 && _set_file_descriptor != _file_descriptor)
00320 {
00321 _handler->close_file_descriptor(_set_file_descriptor);
00322 _set_file_descriptor = -1;
00323 }
00324
00325 _is_added = false;
00326 }
00327
00328
00329
00330
00331
00332 if (!_is_set ||
00333 _set_file_descriptor != _file_descriptor ||
00334 _set_watch_read != _watch_read ||
00335 _set_watch_write != _watch_write)
00336 {
00337 short events = EV_PERSIST;
00338
00339 if (_file_descriptor != -1)
00340 {
00341 if (_watch_read)
00342 events |= EV_READ;
00343 if (_watch_write)
00344 events |= EV_WRITE;
00345
00346 _module.log_debug("[%X] Watching%s%s", _handler,
00347 _watch_read ? " Read" : "",
00348 _watch_write ? " Write" : "");
00349 }
00350
00351 event_set(&_event, _file_descriptor, events, handler_provider_callback,
00352 this);
00353
00354 if (event_base_set(_thread.get_event_base(), &_event) == -1)
00355 {
00356 _is_set = false;
00357 _module.log_error(_("[%X] Failed event_base_set: -1"), _handler);
00358 _handler->shutdown();
00359 return;
00360 }
00361
00362 _is_set = true;
00363 _set_file_descriptor = _file_descriptor;
00364 _set_watch_read = _watch_read;
00365 _set_watch_write = _watch_write;
00366 }
00367
00368 int return_code;
00369 if (_expire_time > 0)
00370 {
00371 if (_set_expire_time == 0)
00372 {
00373 _set_expire_time = _expire_time;
00374 const struct timeval& tv = _thread.get_now();
00375 uint64_t now = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00376 if (now < _expire_time)
00377 {
00378 _timer.tv_sec = (_expire_time - now) / 1000;
00379 _timer.tv_usec = ((_expire_time - now) % 1000) * 1000;
00380 }
00381 else
00382 {
00383 _timer.tv_sec = 0;
00384 _timer.tv_usec = 1;
00385 }
00386 }
00387
00388 return_code = event_add(&_event, &_timer);
00389 _is_added = true;
00390 }
00391 else if (_set_file_descriptor != -1)
00392 {
00393 return_code = event_add(&_event, NULL);
00394 _is_added = true;
00395 }
00396 else
00397 return_code = 0;
00398
00399 if (return_code == -1)
00400 {
00401 _is_added = false;
00402 _module.log_error(_("[%X] Failed event_add: -1"), _handler);
00403 _handler->shutdown();
00404 return;
00405 }
00406 }
00407
00408 void handler_provider::_callback(int file_descriptor, short events)
00409 {
00410 _is_processing = true;
00411
00412
00413
00414
00415
00416
00417 if (events & EV_TIMEOUT && !_need_stop)
00418 {
00419 _set_expire_time = 0;
00420
00421 const struct timeval& tv = _thread.get_now();
00422 uint64_t now = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00423 if (_expire_time > 0 && now >= _expire_time)
00424 {
00425 _module.log_debug(_("[%X] Timer expired"), _handler);
00426 _expire_time = 0;
00427 try
00428 {
00429 _handler->timer_expired();
00430 }
00431 catch (exception& e)
00432 {
00433 _module.log_error(_("[%X] Exception caught during timer_expired: %s"),
00434 _handler, e.what());
00435 _handler->shutdown();
00436 }
00437 }
00438 }
00439
00440 if (events & EV_READ && !_need_stop)
00441 {
00442 _module.log_debug(_("[%X] Read ready"), _handler);
00443 _watch_read = false;
00444 try
00445 {
00446 _handler->read_ready(file_descriptor);
00447 }
00448 catch (exception& e)
00449 {
00450 _module.log_error(_("[%X] Exception caught during read_ready: %s"),
00451 _handler, e.what());
00452 _handler->shutdown();
00453 }
00454 }
00455
00456 if (events & EV_WRITE && !_need_stop)
00457 {
00458 _module.log_debug(_("[%X] Write ready"), _handler);
00459 _watch_write = false;
00460 try
00461 {
00462 _handler->write_ready(file_descriptor);
00463 }
00464 catch (exception& e)
00465 {
00466 _module.log_error(_("[%X] Exception caught during write_ready: %s"),
00467 _handler, e.what());
00468 _handler->shutdown();
00469 }
00470 }
00471
00472 _is_processing = false;
00473
00474 _update_event();
00475 }
00476
00477 }
00478 }
00479 }