Skip to content

missing messages #35

@chrisdenley

Description

@chrisdenley

I seem to be losing messages from the websocket stream intermittently. I'm not sure if the messages are getting dropped by AnyEvent::WebSocket::Client, the socket, or maybe the server just isn't sending them. Almost a couple hundred at a time.

I haven't re-created the issue using this simple test script yet (very intermittent), but it should effectively be doing the same thing as far as the web socket is concerned.

Is there a limit to how much websocket data can be buffered, and what happens when that limit is exceeded? Any idea how I could troubleshoot this issue? Since it uses SSL, I can't check a packet capture to see if the missing messages were actually sent.

#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent::WebSocket::Client;
use JSON::XS;

my @prods = qw(BTC-USD LTC-USD ETH-USD);
my $wssurl = "wss://ws-feed.gdax.com/";
my %seq = ();
my %lastseq = ();
my $connection;
my $cv;

sub shutdown {
	if(defined $connection) {
		$connection->close();
	}
	else {
		exit(0);
	}
}

$SIG{'INT'} = \&shutdown;
$SIG{'TERM'} = \&shutdown;

my $sock = AnyEvent::WebSocket::Client->new;
$sock->connect($wssurl)->cb(sub {
	$connection = eval { shift->recv };
	my $subscribe = {
		type => 'subscribe',
		product_ids => \@prods,
		channels => ['full']
	};
	my $json = encode_json($subscribe);
	print $json."\n";
	$connection->send($json);
	$connection->on(each_message => \&load_msg);
	$connection->on(finish => sub {
		print "CONNECTION CLOSED\n";
		$cv->croak();
	});
});
$cv = AnyEvent->condvar;
my $idle = AnyEvent->idle(cb => \&process);
$cv->recv;

sub process {
	foreach my $prod(@prods) {
		next unless(defined $seq{$prod}); # nothing to process yet
		next if(defined $lastseq{$prod} and $lastseq{$prod} == $seq{$prod}); # nothing new to process
		$lastseq{$prod} = $seq{$prod};
		print "processing new data for $prod\n";
		sleep 1;
	}
}

sub load_msg {
    my ($conn,$msg) = @_;
    my $data = decode_json($msg->body);
    my $type = $$data{'type'};
	my $aseq = $$data{'sequence'};
	my $aprod = $$data{'product_id'};
	if(defined $aseq and defined $aprod) {
		print "$aprod $aseq\n";
		if(defined $seq{$aprod}) {
			if($aseq != $seq{$aprod} + 1) {
				die("Missed $aprod message(s), went from ".$seq{$aprod}." to $aseq");
			}
		}
		$seq{$aprod} = $aseq;
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions