00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <cerrno>
00027 #include <cstring>
00028 #include <netdb.h>
00029 #include <sys/socket.h>
00030 #include <sys/types.h>
00031
00032 #include <scalestack/kernel/module.h>
00033 #include <scalestack/network/stream.h>
00034 #include <scalestack/network/stream_service.h>
00035
00036 using namespace std;
00037
00038 namespace scalestack
00039 {
00040 namespace network
00041 {
00042
00043
00044
00045
00046
00047 stream_provider::stream_provider(event::handler_service& handler_service,
00048 stream_service& stream_service,
00049 int file_descriptor):
00050 event::handler(handler_service),
00051 _addresses(),
00052 _address(),
00053 _connecting(),
00054 _waiting_for_consume(),
00055 _call_flush_write(),
00056 _shutdown_write(),
00057 _connection_dead(),
00058 _file_descriptor(file_descriptor),
00059 _read_buffer_offset(),
00060 _read_buffer_size(),
00061 _write_buffer_offset(),
00062 _write_buffer_size(),
00063 _stream(stream_service.add_stream()),
00064 _stream_service(stream_service)
00065 {
00066 _stream->_stream_provider = this;
00067 }
00068
00069 stream_provider::stream_provider(event::handler_service& handler_service,
00070 stream_service& stream_service,
00071 const struct addrinfo* addresses):
00072 event::handler(handler_service),
00073 _addresses(addresses),
00074 _address(),
00075 _connecting(),
00076 _waiting_for_consume(),
00077 _call_flush_write(),
00078 _shutdown_write(),
00079 _connection_dead(),
00080 _file_descriptor(-1),
00081 _read_buffer_offset(),
00082 _read_buffer_size(),
00083 _write_buffer_offset(),
00084 _write_buffer_size(),
00085 _stream(stream_service.add_stream()),
00086 _stream_service(stream_service)
00087 {
00088 _stream->_stream_provider = this;
00089 }
00090
00091 stream_provider::~stream_provider()
00092 {
00093 }
00094
00095 void stream_provider::started(void)
00096 {
00097 if (_addresses == NULL)
00098 {
00099 set_file_descriptor(_file_descriptor);
00100 watch_read();
00101 _stream->connected();
00102 }
00103 else
00104 reconnect();
00105 }
00106
00107 void stream_provider::run(void)
00108 {
00109 if (_shutdown_write && _write_buffer_size == 0)
00110 {
00111 int return_code = ::shutdown(_file_descriptor, SHUT_WR);
00112 if (return_code == -1)
00113 {
00114 _module.log_error(_("Failed write shutdown: %s:%d"), strerror(errno),
00115 errno);
00116 shutdown();
00117 return;
00118 }
00119 }
00120
00121 _stream->run();
00122 }
00123
00124 void stream_provider::set_file_descriptor(int file_descriptor)
00125 {
00126 _file_descriptor = file_descriptor;
00127 event::handler::set_file_descriptor(file_descriptor);
00128 }
00129
00130 void stream_provider::reconnect(void)
00131 {
00132 set_file_descriptor(-1);
00133 _address = _addresses;
00134 _connect();
00135 }
00136
00137 void stream_provider::consume(size_t size)
00138 {
00139 if (size == 0)
00140 return;
00141
00142 _consume(size);
00143 watch_read();
00144 }
00145
00146 size_t stream_provider::write(uint8_t* buffer, size_t size, bool flush)
00147 {
00148 if (size == 0 || _connecting)
00149 return 0;
00150
00151 size_t written;
00152
00153
00154 _call_flush_write = false;
00155
00156 if (_write_buffer_size == 0 && (flush || size >= _write_buffer_max_size))
00157 written = 0;
00158 else if (size <= (_write_buffer_max_size -
00159 (_write_buffer_offset + _write_buffer_size)))
00160 {
00161
00162 memcpy(_write_buffer + _write_buffer_offset + _write_buffer_size,
00163 buffer,
00164 size);
00165 _write_buffer_size += size;
00166 if (flush)
00167 _flush();
00168
00169 return size;
00170 }
00171 else
00172 {
00173
00174 if (size < _write_buffer_max_size)
00175 {
00176
00177 written = _write_buffer_max_size -
00178 (_write_buffer_offset + _write_buffer_size);
00179 if (written > 0)
00180 {
00181 memcpy(_write_buffer + _write_buffer_offset + _write_buffer_size,
00182 buffer,
00183 written);
00184 _write_buffer_size += written;
00185 }
00186 }
00187 else
00188 written = 0;
00189
00190 _flush();
00191 }
00192
00193 if (_write_buffer_size == 0 &&
00194 (flush || (size - written) >= _write_buffer_max_size))
00195 {
00196
00197
00198
00199
00200 written += _write(buffer + written, size - written);
00201 if (written == size)
00202 return size;
00203 }
00204
00205
00206 if ((size - written) >= _write_buffer_max_size)
00207 {
00208 _call_flush_write = true;
00209 return written;
00210 }
00211
00212 size_t fill_size =
00213 _write_buffer_max_size - (_write_buffer_offset + _write_buffer_size);
00214 if (fill_size == 0)
00215 {
00216 _call_flush_write = true;
00217 return written;
00218 }
00219
00220 if ((size - written) <= fill_size)
00221 fill_size = size - written;
00222 else
00223 _call_flush_write = true;
00224
00225
00226 memcpy(_write_buffer + _write_buffer_offset + _write_buffer_size,
00227 buffer + written,
00228 fill_size);
00229 _write_buffer_size += fill_size;
00230
00231 return written + fill_size;
00232 }
00233
00234 void stream_provider::shutdown_write(void)
00235 {
00236 _shutdown_write = true;
00237 run_now();
00238 }
00239
00240
00241
00242
00243
00244 void stream_provider::read_ready(int)
00245 {
00246 while (!_connecting && _read_buffer_size != _read_buffer_max_size)
00247 {
00248 size_t free_size =
00249 _read_buffer_max_size - (_read_buffer_offset + _read_buffer_size);
00250 if (free_size <= (_read_buffer_max_size / 4))
00251 {
00252
00253
00254
00255
00256
00257 if (_waiting_for_consume)
00258 return;
00259
00260 memmove(_read_buffer,
00261 _read_buffer + _read_buffer_offset,
00262 _read_buffer_size);
00263 _read_buffer_offset = 0;
00264 free_size = _read_buffer_max_size - _read_buffer_size;
00265 }
00266
00267 ssize_t return_size = ::read(
00268 _file_descriptor,
00269 _read_buffer + _read_buffer_offset + _read_buffer_size,
00270 free_size);
00271 if (return_size == 0)
00272 {
00273 _module.log_info(_("Received EOF on read"));
00274 _shutdown_write = false;
00275
00276 if (_write_buffer_size > 0)
00277 _flush();
00278
00279 if (_write_buffer_size > 0 || _call_flush_write)
00280 {
00281 _connection_dead = true;
00282 return;
00283 }
00284
00285 _stream->read_eof();
00286 return;
00287 }
00288 else if (return_size == -1)
00289 {
00290 if (errno == EAGAIN)
00291 {
00292 watch_read();
00293 return;
00294 }
00295 else if (errno == EINTR)
00296 continue;
00297
00298 _module.log_error(_("Received error on read: %s:%d"),
00299 strerror(errno), errno);
00300 _shutdown_write = false;
00301 shutdown();
00302 return;
00303 }
00304
00305 _module.log_debug(_("[%X] Read %d bytes"), this, return_size);
00306
00307 _read_buffer_size += return_size;
00308 _consume(_stream->read(_read_buffer + _read_buffer_offset,
00309 _read_buffer_size));
00310 }
00311 }
00312
00313 void stream_provider::write_ready(int)
00314 {
00315 if (_connecting)
00316 {
00317 _connecting = false;
00318 _connect();
00319 return;
00320 }
00321
00322 if (_write_buffer_size != 0)
00323 _flush();
00324
00325 if (_write_buffer_size == 0)
00326 {
00327 if (_call_flush_write)
00328 {
00329 _call_flush_write = false;
00330 _stream->flush_write();
00331
00332 if (!_call_flush_write)
00333 watch_read();
00334 }
00335
00336 if (_connection_dead && _write_buffer_size == 0 && !_call_flush_write)
00337 shutdown();
00338 }
00339 }
00340
00341 void stream_provider::_connect(void)
00342 {
00343 while (true)
00344 {
00345 if (_file_descriptor == -1)
00346 {
00347 int file_descriptor = ::socket(_address->ai_family, SOCK_STREAM, 0);
00348
00349 if (file_descriptor == -1)
00350 _module.log_fatal(_("Failed socket: %s:%d"), strerror(errno), errno);
00351
00352 _connecting = true;
00353 set_file_descriptor(file_descriptor);
00354 }
00355
00356 int return_code = ::connect(_file_descriptor, _address->ai_addr,
00357 _address->ai_addrlen);
00358 if (return_code == 0 || errno == EISCONN)
00359 {
00360 _module.log_info(_("Connected"));
00361 _connecting = false;
00362 watch_read();
00363 _stream->connected();
00364 return;
00365 }
00366
00367 if (errno == EAGAIN || errno == EINTR)
00368 continue;
00369
00370 if (errno == EINPROGRESS || errno == EALREADY)
00371 {
00372 _module.log_debug(_("Connection in progress"));
00373 _connecting = true;
00374 watch_write();
00375 return;
00376 }
00377
00378 if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
00379 {
00380 _address = _address->ai_next;
00381 if (_address == NULL)
00382 {
00383 _module.log_error(_("Could not connect"));
00384 shutdown();
00385 return;
00386 }
00387
00388 set_file_descriptor(-1);
00389 continue;
00390 }
00391
00392 _module.log_error(_("Could not connect: %s:%d"), strerror(errno), errno);
00393 shutdown();
00394 return;
00395 }
00396 }
00397
00398 void stream_provider::_consume(size_t size)
00399 {
00400 if (size == 0)
00401 {
00402 _waiting_for_consume = true;
00403 return;
00404 }
00405
00406 if (size > _read_buffer_size)
00407 {
00408 _module.log_error(_("Consumed more data than is in buffer: %u > %u"),
00409 size, _read_buffer_size);
00410 shutdown();
00411 }
00412 else if (size == _read_buffer_size)
00413 {
00414 _read_buffer_size = 0;
00415 _read_buffer_offset = 0;
00416 _waiting_for_consume = false;
00417 }
00418 else
00419 {
00420 _read_buffer_size -= size;
00421 _read_buffer_offset += size;
00422 _waiting_for_consume = true;
00423 }
00424 }
00425
00426 void stream_provider::_flush(void)
00427 {
00428 size_t return_size = _write(_write_buffer + _write_buffer_offset,
00429 _write_buffer_size);
00430 if (return_size == 0)
00431 return;
00432
00433 if (return_size == _write_buffer_size)
00434 {
00435 _write_buffer_size = 0;
00436 _write_buffer_offset = 0;
00437
00438 if (_shutdown_write)
00439 run_now();
00440 }
00441 else
00442 {
00443 _write_buffer_size -= return_size;
00444 _write_buffer_offset += return_size;
00445
00446 if ((_write_buffer_offset + _write_buffer_size) >=
00447 (_write_buffer_max_size / 2))
00448 {
00449 memmove(_write_buffer,
00450 _write_buffer + _write_buffer_offset,
00451 _write_buffer_size);
00452 _write_buffer_offset = 0;
00453 }
00454 }
00455 }
00456
00457 size_t stream_provider::_write(uint8_t* buffer, size_t size)
00458 {
00459 while (true)
00460 {
00461 ssize_t return_size = ::write(_file_descriptor, buffer, size);
00462 if (return_size == 0)
00463 {
00464 _module.log_info(_("Received EOF on write"));
00465 _shutdown_write = false;
00466 shutdown();
00467 return 0;
00468 }
00469 else if (return_size == -1)
00470 {
00471 if (errno == EAGAIN)
00472 {
00473 watch_write();
00474 return 0;
00475 }
00476 else if (errno == EINTR)
00477 continue;
00478
00479 _module.log_error(_("Received error on write: %s:%d"),
00480 strerror(errno), errno);
00481 _shutdown_write = false;
00482 shutdown();
00483 return 0;
00484 }
00485
00486 _module.log_debug(_("[%X] Wrote %d bytes"), this, return_size);
00487
00488 if (static_cast<size_t>(return_size) != size)
00489 watch_write();
00490
00491 return return_size;
00492 }
00493 }
00494
00495 }
00496 }