00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #include "config.h"
00025
00026 #include <cerrno>
00027 #include <cstring>
00028 #include <netdb.h>
00029 #include <sys/socket.h>
00030 #include <sys/types.h>
00031
00032 #include <scalestack/kernel/module.h>
00033 #include <scalestack/network/datagram.h>
00034 #include <scalestack/network/datagram_service.h>
00035
00036 using namespace std;
00037
00038 namespace scalestack
00039 {
00040 namespace network
00041 {
00042
00043
00044
00045
00046
00047 datagram_provider::datagram_provider(event::handler_service& handler_service,
00048 datagram_service& datagram_service,
00049 const struct addrinfo* local,
00050 const struct addrinfo* peer):
00051 event::handler(handler_service),
00052 _file_descriptor(-1),
00053 _peer_size(),
00054 _receive_buffer_size(),
00055 _datagram(datagram_service.add_datagram()),
00056 _datagram_service(datagram_service),
00057 _peer(reinterpret_cast<struct sockaddr*>(&_peer_storage)),
00058 _peer_storage()
00059 {
00060 _datagram->_datagram_provider = this;
00061
00062 int family = 0;
00063
00064 if (local != NULL)
00065 family = local->ai_family;
00066 else if (peer != NULL)
00067 family = peer->ai_family;
00068 else
00069 _module.log_fatal(_("Must specify local or peer address for datagram"));
00070
00071 int file_descriptor = socket(family, SOCK_DGRAM, 0);
00072 if (file_descriptor == -1)
00073 _module.log_fatal(_("Failed socket: %s:%d"), strerror(errno), errno);
00074
00075 if (local != NULL)
00076 {
00077 int return_code = ::bind(file_descriptor, local->ai_addr,
00078 local->ai_addrlen);
00079 if (return_code == -1)
00080 {
00081 close(file_descriptor);
00082 if (errno == EADDRINUSE)
00083 {
00084 _module.log_info(_("Bind failed, local address already in use"));
00085 throw exception();
00086 }
00087
00088 _module.log_fatal(_("Failed bind: %s:%d"), strerror(errno), errno);
00089 }
00090 }
00091
00092 if (peer != NULL)
00093 {
00094 memcpy(_peer, peer->ai_addr, peer->ai_addrlen);
00095 _peer_size = peer->ai_addrlen;
00096 }
00097
00098 _file_descriptor = file_descriptor;
00099 }
00100
00101 datagram_provider::~datagram_provider()
00102 {
00103 }
00104
00105 void datagram_provider::started(void)
00106 {
00107 set_file_descriptor(_file_descriptor);
00108 _datagram->started();
00109 }
00110
00111 void datagram_provider::set_file_descriptor(int file_descriptor)
00112 {
00113 _file_descriptor = file_descriptor;
00114 event::handler::set_file_descriptor(file_descriptor);
00115 if (file_descriptor != -1)
00116 watch_read();
00117 }
00118
00119 void datagram_provider::consume(size_t size)
00120 {
00121 if (size == 0)
00122 return;
00123
00124 if (size > _receive_buffer_size)
00125 {
00126 _module.log_error(_("Consumed more data than is in buffer: %u > %u"),
00127 size, _receive_buffer_size);
00128 shutdown();
00129 }
00130 else if (size == _receive_buffer_size)
00131 {
00132 _receive_buffer_size = 0;
00133 watch_read();
00134 }
00135 else
00136 _receive_buffer_size -= size;
00137 }
00138
00139 size_t datagram_provider::send(uint8_t* buffer,
00140 size_t size,
00141 const struct sockaddr& peer,
00142 socklen_t peer_size)
00143 {
00144 if (size == 0)
00145 return 0;
00146
00147 ssize_t sent = ::sendto(_file_descriptor, buffer, size, 0, &peer, peer_size);
00148 if (sent > 0)
00149 {
00150 if (sent != static_cast<ssize_t>(size))
00151 {
00152 _module.log_error(_("Datagram sendto() only sent %d of %u"), sent, size);
00153 shutdown();
00154 return 0;
00155 }
00156
00157 _module.log_debug(_("Sent %d bytes"), sent);
00158 watch_read();
00159 return sent;
00160 }
00161
00162 if (sent == 0)
00163 return 0;
00164
00165 if (errno == EAGAIN)
00166 {
00167 watch_write();
00168 return 0;
00169 }
00170
00171 _module.log_error(_("Failed sendto: %s:%d"), strerror(errno), errno);
00172 shutdown();
00173 return 0;
00174 }
00175
00176 size_t datagram_provider::send(uint8_t* buffer, size_t size)
00177 {
00178 if (_peer_size == 0)
00179 _module.log_fatal(_("No peer address was set."));
00180
00181 return send(buffer, size, *_peer, _peer_size);
00182 }
00183
00184
00185
00186
00187
00188 void datagram_provider::read_ready(int)
00189 {
00190 while (_receive_buffer_size == 0)
00191 {
00192 _peer_size = sizeof(_peer_storage);
00193
00194 ssize_t received = ::recvfrom(_file_descriptor,
00195 _receive_buffer,
00196 _receive_buffer_max_size,
00197 0,
00198 _peer,
00199 &_peer_size);
00200 if (received == 0)
00201 return;
00202 else if (received == -1)
00203 {
00204 if (errno == EAGAIN)
00205 {
00206 watch_read();
00207 return;
00208 }
00209 else if (errno == EINTR)
00210 continue;
00211
00212 _module.log_error(_("Failed recvfrom: %s:%d"), strerror(errno), errno);
00213 shutdown();
00214 return;
00215 }
00216
00217 _module.log_debug(_("Received %d bytes"), received);
00218 _receive_buffer_size = received;
00219 consume(_datagram->receive(_receive_buffer, _receive_buffer_size,
00220 *_peer, _peer_size));
00221 }
00222 }
00223
00224 }
00225 }