# # Use a pure-perl event handler that kinda emulates's Event # for IO::Event's event handler. # my $sdebug = 0; { package IO::Event::Emulate; use strict; use warnings; our @ISA = qw(IO::Event::Common); my %want_read; my %want_write; my %want_exception; my %active; my $rin = ''; my $win = ''; my $ein = ''; my $unloop; sub import { require IO::Event; IO::Event->import('emulate_Event'); } sub new { my ($pkg, @stuff) = @_; $pkg->SUPER::new(@stuff); } # a replacement for Event::loop sub ie_loop { $unloop = 0; my ($rout, $wout, $eout); for(;;) { print STDERR "EMULATE LOOP-TOP\n" if $sdebug; last if $unloop; my $timer_timeout = IO::Event::Emulate::Timer->get_time_to_timer; my $timeout = $timer_timeout || IO::Event::Emulate::Idle->get_time_to_idle; if ($sdebug > 3) { print STDERR "Readers:\n"; for my $ioe (values %want_read) { print STDERR "\t${*$ioe}{ie_desc}\n"; } print STDERR "Writers:\n"; for my $ioe (values %want_write) { print STDERR "\t${*$ioe}{ie_desc}\n"; } print STDERR "Exceptions:\n"; for my $ioe (values %want_exception) { print STDERR "\t${*$ioe}{ie_desc}\n"; } } my ($nfound, $timeleft) = select($rout=$rin, $wout=$win, $eout=$ein, $timeout); print STDERR "SELECT: N$nfound\n" if $sdebug; if ($nfound) { EVENT: { if ($rout) { for my $ioe (values %want_read) { next unless vec($rout, ${*$ioe}{ie_fileno}, 1); my $ret = $ioe->ie_dispatch_read(${*$ioe}{ie_fh}); if ($ret && vec($wout, ${*$ioe}{ie_fileno}, 1)) { vec($wout, ${*$ioe}{ie_fileno}, 1) = 0; $nfound--; } if ($ret && vec($eout, ${*$ioe}{ie_fileno}, 1)) { vec($eout, ${*$ioe}{ie_fileno}, 1) = 0; $nfound--; } $nfound--; last EVENT unless $nfound > 0; } } if ($wout) { for my $ioe (values %want_write) { next unless vec($wout, ${*$ioe}{ie_fileno}, 1); my $ret = $ioe->ie_dispatch_write(${*$ioe}{ie_fh}); if ($ret && vec($eout, ${*$ioe}{ie_fileno}, 1)) { vec($eout, ${*$ioe}{ie_fileno}, 1) = 0; $nfound--; } $nfound--; last EVENT unless $nfound > 0; } } if ($eout) { for my $ioe (values %want_exception) { next unless vec($eout, ${*$ioe}{ie_fileno}, 1); $ioe->ie_dispatch_exception(${*$ioe}{ie_fh}); $nfound--; last EVENT unless $nfound > 0; } } } } IO::Event::Emulate::Timer->invoke_timers if $timer_timeout; IO::Event::Emulate::Idle->invoke_idlers($nfound == 0); } die unless ref($unloop); my @r = @$unloop; shift(@r); return $r[0] if @r == 1; return @r; } sub loop { ie_loop(@_); } sub timer { shift; IO::Event::Emulate::Timer->new(@_); } sub idle { shift; IO::Event::Emulate::Idle->new(@_); } sub unloop_all { $unloop = [1, @_]; } sub set_write_polling { my ($self, $new) = @_; my $fileno = ${*$self}{ie_fileno}; if ($new) { vec($win, $fileno, 1) = 1; $want_write{$fileno} = $want_exception{$fileno} = $self; } else { vec($win, $fileno, 1) = 0; delete $want_write{$fileno}; delete $want_exception{$fileno} unless $want_read{$fileno}; } $ein = $rin | $win; } sub set_read_polling { my ($self, $new) = @_; my $fileno = ${*$self}{ie_fileno}; if ($new) { vec($rin, $fileno, 1) = 1; $want_read{$fileno} = $want_exception{$fileno} = $self; } else { if (defined $fileno) { vec($rin, $fileno, 1) = 0; delete $want_read{$fileno}; delete $want_exception{$fileno} unless $want_write{$fileno} } } $ein = $rin | $win; } sub ie_register { my ($self) = @_; my ($fh, $fileno) = $self->SUPER::ie_register(); $active{$fileno} = $self; $self->readevents(! ${*$self}{ie_readclosed}); $self->writeevents(0); } sub ie_deregister { my ($self) = @_; $self->SUPER::ie_deregister(); delete $active{${*$self}{ie_fileno}}; } }{package IO::Event::Emulate::Timer; use warnings; use strict; use Time::HiRes qw(time); use Carp qw(confess); use Scalar::Util qw(reftype); our @ISA = qw(IO::Event); our %timers = (); our %levels = (); our %next = (); BEGIN { for $a (qw(at after interval hard cb desc prio repeat timeout)) { my $attrib = $a; no strict 'refs'; *{"IO::Event::Emulate::Timer::$a"} = sub { my $self = shift; return $self->{$attrib} unless @_; my $val = shift; $self->{$attrib} = $val; return $val; }; } } my $tcount = 1; my @timers; sub get_time_to_timer { @timers = sort { $a <=> $b } keys %next; my $t = time; if (@timers) { if ($timers[0] > $t) { my $timeout = $timers[0] - $t; $timeout = 0.01 if $timeout < 0.01; return $timeout; } else { return 0.01; } } return undef; } sub invoke_timers { while (@timers && $timers[0] < time) { print STDERR "Ti" if $sdebug; my $t = shift(@timers); my $te = delete $next{$t}; for my $tnum (keys %$te) { my $timer = $te->{$tnum}; next unless $timer->{next}; next unless $timer->{next} eq $t; $timer->now(); } } } sub new { my ($pkg, %param) = @_; confess unless $param{cb}; die if $param{after} && $param{at}; my $timer = bless { tcount => $tcount, last_time => time, %param }, __PACKAGE__; $timers{$tcount++} = $timer; $timer->schedule; return $timer; } sub schedule { my ($self) = @_; my $next; if ($self->{invoked}) { if ($self->{interval}) { $next = $self->{last_time} + $self->{interval}; if ($self->{hard} && $self->{next}) { $next = $self->{next} + $self->{interval}; } } else { $next = undef; } } elsif ($self->{at}) { $next = $self->{at}; } elsif ($self->{after}) { $next = $self->{after} + time; } elsif ($self->{interval}) { $next = $self->{interval} + time; } else { die; } if ($next) { $next{$next}{$self->{tcount}} = $self; $self->{next} = $next; } else { $self->{next} = 0; $self->stop(); } } sub start { my ($self) = @_; $timers{$self->{tcount}} = $self; delete $self->{stopped}; $self->schedule; } sub again { my ($self) = @_; $self->{last_time} = time; $self->start; } sub now { my ($self) = @_; $self->{last_time} = time; local($levels{$self->{tcount}}) = ($levels{$self->{tcount}} || 0)+1; $self->{invoked}++; if (reftype($self->{cb}) eq 'CODE') { $self->{cb}->($self); } elsif (reftype($self->{cb}) eq 'ARRAY') { my ($o, $m) = @{$self->{cb}}; $o->$m($self); } else { die; } $self->schedule; } sub stop { my ($self) = @_; delete $timers{$self->{tcount}}; $self->{stopped} = time; } sub cancel { my ($self) = @_; $self->{cancelled} = time; delete $timers{$self->{tcount}}; } sub is_cancelled { my ($self) = @_; return $self->{cancelled}; } sub is_active { my ($self) = @_; return exists $timers{$self->{tcount}}; } sub is_running { my ($self) = @_; return $levels{$self->{tcount}}; } sub is_suspended { my ($self) = @_; return 0; } sub pending { return; } }{package IO::Event::Emulate::Idle; use warnings; use strict; use Carp qw(confess); use Scalar::Util qw(reftype); use Time::HiRes qw(time); our @ISA = qw(IO::Event); our %timers = (); our %levels = (); our %next = (); my $icount = 0; my %idlers; our $time_to_idle_timeout = 1; sub new { my ($pkg, %param) = @_; confess unless $param{cb}; die if $param{after} && $param{at}; my $idler = bless { icount => $icount, last_time => time, %param }, __PACKAGE__; $idlers{$icount++} = $idler; return $idler; } sub get_time_to_idle { return undef unless %idlers; return $time_to_idle_timeout; } sub start { my ($self) = @_; $idlers{$self->{icount}} = $self; delete $self->{stopped}; $self->schedule; } sub again { my ($self) = @_; $self->{last_time} = time; $self->start; } sub invoke_idlers { my ($pkg, $is_idle) = @_; for my $idler (values %idlers) { if ($idler->{min} && (time - $idler->{last_time}) < $idler->{min}) { next; } unless ($is_idle) { next unless $idler->{max}; next unless (time - $idler->{last_time}) > $idler->{max}; } $idler->now; } } sub now { my ($self) = @_; $self->{last_time} = time; local($levels{$self->{icount}}) = ($levels{$self->{icount}} || 0)+1; if (defined($self->{reentrant}) && ! $self->{reentrant} && $self->{icount}) { return; } $self->{invoked}++; if (defined($self->{repeat}) && ! $self->{repeat}) { $self->cancel; } if (reftype($self->{cb}) eq 'CODE') { $self->{cb}->($self); } elsif (reftype($self->{cb}) eq 'ARRAY') { my ($o, $m) = @{$self->{cb}}; $o->$m($self); } else { die; } $self->{last_time} = time; } sub stop { my ($self) = @_; delete $idlers{$self->{icount}}; $self->{stopped} = time; } sub cancel { my ($self) = @_; $self->{cancelled} = time; delete $idlers{$self->{icount}}; } sub is_cancelled { my ($self) = @_; return $self->{cancelled}; } sub is_active { my ($self) = @_; return exists $idlers{$self->{icount}}; } sub is_running { my ($self) = @_; return $levels{$self->{icount}}; } sub is_suspended { my ($self) = @_; return 0; } sub pending { return; } }#end package 1;