00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00024 #ifndef SCALESTACK_NETWORK_STREAM_H
00025 #define SCALESTACK_NETWORK_STREAM_H
00026
00027 #include <memory>
00028 #include <stdint.h>
00029
00030 #include <scalestack/event/handler.h>
00031 #include <scalestack/kernel/macros.h>
00032
00033 struct addrinfo;
00034
00035 namespace scalestack
00036 {
00037 namespace network
00038 {
00039
00040 class stream_provider;
00041 class stream_service;
00042
00046 class stream
00047 {
00048 public:
00049
00050 stream(kernel::module& module);
00051
00052 virtual ~stream();
00053
00057 virtual void shutdown(void);
00058
00062 void stop(void);
00063
00067 void set_timer(uint64_t milliseconds);
00068
00072 virtual void timer_expired(void);
00073
00077 void run_now(void);
00078
00082 virtual void run(void);
00083
00088 virtual void connected(void);
00089
00094 void reconnect(void);
00095
00104 virtual size_t read(uint8_t* buffer, size_t size) = 0;
00105
00115 void consume(size_t size);
00116
00121 virtual void read_eof(void);
00122
00136 size_t write(uint8_t* buffer, size_t size, bool flush = false);
00137
00143 virtual void flush_write(void) = 0;
00144
00148 void shutdown_write(void);
00149
00150 protected:
00151
00152 kernel::module& _module;
00153
00154 private:
00155
00159 stream(const stream&);
00160
00164 stream& operator=(const stream&);
00165
00166 stream_provider* _stream_provider;
00167
00168 friend class stream_provider;
00169 };
00170
00174 class SCALESTACK_API stream_provider: public event::handler
00175 {
00176 public:
00177
00187 stream_provider(event::handler_service& handler_service,
00188 stream_service& stream_service,
00189 int file_descriptor);
00190
00200 stream_provider(event::handler_service& handler_service,
00201 stream_service& stream_service,
00202 const struct addrinfo* addresses);
00203
00204 virtual ~stream_provider();
00205
00209 void started();
00210
00214 void shutdown(void);
00215
00219 void timer_expired(void);
00220
00224 void run(void);
00225
00229 virtual void set_file_descriptor(int file_descriptor);
00230
00234 void reconnect(void);
00235
00239 void consume(size_t size);
00240
00244 size_t write(uint8_t* buffer, size_t size, bool flush = false);
00245
00249 void shutdown_write(void);
00250
00251 protected:
00252
00253 const struct addrinfo* _addresses;
00254 const struct addrinfo* _address;
00255
00256 private:
00257
00261 SCALESTACK_LOCAL
00262 stream_provider(const stream_provider&);
00263
00267 SCALESTACK_LOCAL
00268 stream_provider& operator=(const stream_provider&);
00269
00277 void read_ready(int file_descriptor);
00278
00286 void write_ready(int file_descriptor);
00287
00291 SCALESTACK_LOCAL
00292 void _connect(void);
00293
00300 SCALESTACK_LOCAL
00301 void _consume(size_t size);
00302
00306 SCALESTACK_LOCAL
00307 void _flush(void);
00308
00316 SCALESTACK_LOCAL
00317 size_t _write(uint8_t* buffer, size_t size);
00318
00319
00320
00321 static const size_t _read_buffer_max_size = 8192;
00322 static const size_t _write_buffer_max_size = 8192;
00323
00324 bool _connecting;
00325 bool _waiting_for_consume;
00326 bool _call_flush_write;
00327 bool _shutdown_write;
00328 bool _connection_dead;
00329 int _file_descriptor;
00330 size_t _read_buffer_offset;
00331 size_t _read_buffer_size;
00332 size_t _write_buffer_offset;
00333 size_t _write_buffer_size;
00334 std::auto_ptr<stream> _stream;
00335 stream_service& _stream_service;
00336 uint8_t _read_buffer[_read_buffer_max_size];
00337 uint8_t _write_buffer[_write_buffer_max_size];
00338 };
00339
00340
00341
00342
00343
00344 inline stream::stream(kernel::module& module):
00345 _module(module),
00346 _stream_provider()
00347 {
00348 }
00349
00350 inline stream::~stream()
00351 {
00352 }
00353
00354 inline void stream::shutdown(void)
00355 {
00356 _stream_provider->stop();
00357 }
00358
00359 inline void stream::stop(void)
00360 {
00361 _stream_provider->stop();
00362 }
00363
00364 inline void stream::set_timer(uint64_t milliseconds)
00365 {
00366 _stream_provider->set_timer(milliseconds);
00367 }
00368
00369 inline void stream::timer_expired(void)
00370 {
00371 }
00372
00373 inline void stream::run_now(void)
00374 {
00375 _stream_provider->run_now();
00376 }
00377
00378 inline void stream::run(void)
00379 {
00380 }
00381
00382 inline void stream::connected(void)
00383 {
00384 }
00385
00386 inline void stream::reconnect(void)
00387 {
00388 _stream_provider->reconnect();
00389 }
00390
00391 inline void stream::consume(size_t size)
00392 {
00393 _stream_provider->consume(size);
00394 }
00395
00396 inline void stream::read_eof(void)
00397 {
00398 shutdown();
00399 }
00400
00401 inline size_t stream::write(uint8_t* buffer, size_t size, bool flush)
00402 {
00403 return _stream_provider->write(buffer, size, flush);
00404 }
00405
00406 inline void stream::shutdown_write(void)
00407 {
00408 _stream_provider->shutdown_write();
00409 }
00410
00411
00412
00413
00414
00415 inline void stream_provider::shutdown(void)
00416 {
00417 _stream->shutdown();
00418 }
00419
00420 inline void stream_provider::timer_expired(void)
00421 {
00422 _stream->timer_expired();
00423 }
00424
00425 }
00426 }
00427
00428 #endif