$ git clone https://socialnetwork.ion.nu/socialnetwork.git
commit e5f793bcd508cce2e43ee4d87c73a8471cedd982
Author: Alicia <...>
Date:   Sun Jan 8 14:27:00 2017 +0100

    Improved udpstream to handle disconnects, intentional and timeouts.

diff --git a/udpstream.c b/udpstream.c
index dbd1049..2f48f5a 100644
--- a/udpstream.c
+++ b/udpstream.c
@@ -19,12 +19,19 @@
 #include <string.h>
 #include <stdlib.h>
 #include <errno.h>
+#include <time.h>
 #include <sys/socket.h>
 #include "udpstream.h"
 
 #define TYPE_PAYLOAD 0
 #define TYPE_ACK     1
 #define TYPE_RESEND  2
+#define TYPE_INIT    3 // Should be at the start of each connection
+#define TYPE_CLOSE   4 // Requesting to close the stream
+#define TYPE_CLOSED  5 // Confirming stream closure
+#define TYPE_PING    6
+#define TYPE_PONG    7
+#define TYPE_RESET   8
 #define HEADERSIZE (sizeof(uint32_t)+sizeof(uint16_t)+sizeof(uint8_t))
 // TODO: Handle stale connections, disconnects, maybe a connect message type?
 
@@ -35,6 +42,10 @@ struct packet
   unsigned int buflen;
 };
 
+#define STATE_INIT    1
+#define STATE_CLOSING 2
+#define STATE_CLOSED  4
+#define STATE_PING    8
 struct udpstream
 {
   int sock;
@@ -48,13 +59,15 @@ struct udpstream
   unsigned int recvpacketcount;
   char* buf; // Received but unparsed data
   unsigned int buflen;
+  unsigned char state;
+  time_t timestamp;
 // TODO: add void pointer to keep relevant application data? plus a function to free it if the connection is closed or abandoned as stale
 };
 
 static struct udpstream** streams=0;
 static unsigned int streamcount=0;
 
-struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
+static struct udpstream* stream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
 {
   struct udpstream* stream=malloc(sizeof(struct udpstream));
   stream->sock=sock;
@@ -68,6 +81,8 @@ struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrl
   stream->recvpacketcount=0;
   stream->buf=0;
   stream->buflen=0;
+  stream->state=0; // Start new streams as invalid, need to init
+  stream->timestamp=time(0);
   ++streamcount;
   streams=realloc(streams, sizeof(void*)*streamcount);
   streams[streamcount-1]=stream;
@@ -90,12 +105,12 @@ static struct udpstream* stream_find(struct sockaddr* addr, socklen_t addrlen)
 static ssize_t stream_send(struct udpstream* stream, uint8_t type, uint16_t seq, uint32_t size, const void* buf)
 {
 // TODO: Include a checksum in the header?
-  unsigned char header[HEADERSIZE];
-  memcpy(header, &size, sizeof(uint32_t));
-  memcpy(header+sizeof(uint32_t), &seq, sizeof(uint16_t));
-  memcpy(header+sizeof(uint32_t)+sizeof(uint16_t), &type, sizeof(uint8_t));
-  sendto(stream->sock, header, HEADERSIZE, 0, &stream->addr, stream->addrlen);
-  return sendto(stream->sock, buf, size, 0, &stream->addr, stream->addrlen);
+  unsigned char packet[HEADERSIZE+size];
+  memcpy(packet, &size, sizeof(uint32_t));
+  memcpy(packet+sizeof(uint32_t), &seq, sizeof(uint16_t));
+  memcpy(packet+sizeof(uint32_t)+sizeof(uint16_t), &type, sizeof(uint8_t));
+  memcpy(packet+HEADERSIZE, buf, size);
+  return sendto(stream->sock, packet, HEADERSIZE+size, 0, &stream->addr, stream->addrlen);
 }
 
 static void udpstream_requestresend(struct udpstream* stream, uint16_t seq)
@@ -123,14 +138,48 @@ static void udpstream_requestresend(struct udpstream* stream, uint16_t seq)
   stream_send(stream, TYPE_RESEND, 0, missedcount*sizeof(uint16_t), missed);
 }
 
+static void stream_free(struct udpstream* stream)
+{
+  free(stream->buf);
+  unsigned int i;
+  for(i=0; i<stream->recvpacketcount; ++i)
+  {
+    free(stream->recvpackets[i].buf);
+  }
+  free(stream->recvpackets);
+  for(i=0; i<stream->sentpacketcount; ++i)
+  {
+    free(stream->sentpackets[i].buf);
+  }
+  free(stream->sentpackets);
+  free(stream);
+  for(i=0; i<streamcount; ++i)
+  {
+    if(streams[i]==stream)
+    {
+      --streamcount;
+      memmove(&streams[i], &streams[i+1], sizeof(void*)*(streamcount-i));
+    }
+  }
+}
+
+struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
+{
+  struct udpstream* stream=stream_new(sock, addr, addrlen);
+  stream->state=STATE_INIT; // If we're creating the stream we're the ones initializing it
+  stream_send(stream, TYPE_INIT, 0, 0, 0);
+  return stream;
+}
+
 void udpstream_readsocket(int sock)
 {
+  time_t now=time(0);
   char buf[1024];
   struct sockaddr addr;
   socklen_t addrlen=sizeof(addr);
   ssize_t len=recvfrom(sock, buf, 1024, 0, &addr, &addrlen);
   struct udpstream* stream=stream_find(&addr, addrlen);
-  if(!stream){stream=udpstream_new(sock, &addr, addrlen);}
+  if(!stream){stream=stream_new(sock, &addr, addrlen);}
   stream->buflen+=len;
   stream->buf=realloc(stream->buf, stream->buflen);
   memcpy(stream->buf+(stream->buflen-len), buf, len);
@@ -145,8 +194,18 @@ void udpstream_readsocket(int sock)
     // Complete packet available
     memcpy(&seq, stream->buf+sizeof(uint32_t), sizeof(uint16_t));
     memcpy(&type, stream->buf+sizeof(uint32_t)+sizeof(uint16_t), sizeof(uint8_t));
-    if(type==TYPE_ACK) // Handle acknowledgement of sent packet
+    stream->timestamp=now;
+    if(!(stream->state&STATE_INIT) && type!=TYPE_INIT)
+    {
+      // Ditch invalid streams
+      stream_send(stream, TYPE_RESET, 0, 0, 0);
+      stream_free(stream);
+      return;
+    }
+    if((stream->state&STATE_CLOSING) && type!=TYPE_CLOSED){return;}
+    switch(type)
     {
+    case TYPE_ACK: // Handle acknowledgement of sent packet
       // Remove from sent messages, recipient has confirmed receiving it
       if(payloadsize==sizeof(uint16_t))
       {
@@ -167,47 +226,107 @@ void udpstream_readsocket(int sock)
       }
       stream->buflen-=(payloadsize+HEADERSIZE);
       memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
-      continue;
-    }
-    if(type==TYPE_RESEND) // TODO: Handle request to resend packets not received by the peer
-    {
+      break;
+    case TYPE_RESEND: // TODO: Handle request to resend packets not received by the peer
 fprintf(stderr, "TODO: resend packets\n");
       stream->buflen-=(payloadsize+HEADERSIZE);
       memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
-      continue;
+      break;
+    case TYPE_PAYLOAD:
+      // Send ack, regardless of whether it's in the right order
+      stream_send(stream, TYPE_ACK, 0, sizeof(uint16_t), &seq);
+      // Add to list of parsed packets
+      ++stream->recvpacketcount;
+      stream->recvpackets=realloc(stream->recvpackets, sizeof(struct packet)*stream->recvpacketcount);
+      stream->recvpackets[stream->recvpacketcount-1].seq=seq;
+      stream->recvpackets[stream->recvpacketcount-1].buf=malloc(payloadsize);
+      stream->recvpackets[stream->recvpacketcount-1].buflen=payloadsize;
+      memcpy(stream->recvpackets[stream->recvpacketcount-1].buf, stream->buf+HEADERSIZE, payloadsize);
+      stream->buflen-=(payloadsize+HEADERSIZE);
+      memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+      udpstream_requestresend(stream, seq); // Ask to resend if we're missing any packets
+      break;
+    case TYPE_INIT: // Should be at the start of each connection and must have sequence 0, size 0
+// TODO: If we receive a valid init for an already initialized stream, invalidate the old one (memset ->addr? plus STATE_CLOSED) and create a new stream to indicate a new connection?
+      if(seq || payloadsize)
+      {
+        stream_send(stream, TYPE_RESET, 0, 0, 0);
+        if(stream->state&STATE_INIT) // If it's an established stream, mark it as closed
+        {
+          stream->state|=STATE_CLOSED;
+        }else{ // Otherwise just ditch it
+          stream_free(stream);
+          return;
+        }
+        break;
+      }
+      stream->state|=STATE_INIT;
+      stream->buflen-=(payloadsize+HEADERSIZE);
+      memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+      break;
+    case TYPE_CLOSE: // Requesting to close the stream
+      stream->state|=STATE_CLOSED;
+      stream_send(stream, TYPE_CLOSED, 0, 0, 0);
+      break;
+    case TYPE_CLOSED: // Confirming stream closure
+      if(stream->state&STATE_CLOSING)
+      {
+        stream_free(stream);
+        return;
+      }
+      break;
+    case TYPE_PING:
+      stream_send(stream, TYPE_PONG, 0, 0, 0);
+    case TYPE_PONG:
+      stream->state&=STATE_PING^0xff;
+      stream->buflen-=(payloadsize+HEADERSIZE);
+      memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+      break;
+    case TYPE_RESET:
+      stream->state|=STATE_CLOSED;
+      break;
     }
-    // Send ack, regardless of whether it's in the right order
-    stream_send(stream, TYPE_ACK, 0, sizeof(uint16_t), &seq);
-    // Add to list of parsed packets
-    ++stream->recvpacketcount;
-    stream->recvpackets=realloc(stream->recvpackets, sizeof(struct packet)*stream->recvpacketcount);
-    stream->recvpackets[stream->recvpacketcount-1].seq=seq;
-    stream->recvpackets[stream->recvpacketcount-1].buf=malloc(payloadsize);
-    stream->recvpackets[stream->recvpacketcount-1].buflen=payloadsize;
-    memcpy(stream->recvpackets[stream->recvpacketcount-1].buf, stream->buf+HEADERSIZE, payloadsize);
-    stream->buflen-=(payloadsize+HEADERSIZE);
-    memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
-    udpstream_requestresend(stream, seq); // Ask to resend if we're missing any packets
   }
 }
 
 struct udpstream* udpstream_poll(void)
 {
+  time_t now=time(0);
   unsigned int i;
   for(i=0; i<streamcount; ++i)
   {
+    // Check for state changes
+    if(streams[i]->state&STATE_CLOSED){return streams[i];}
     // Check for the next packet in the order
     unsigned int i2;
     for(i2=0; i2<streams[i]->recvpacketcount; ++i2)
     {
       if(streams[i]->recvpackets[i2].seq==streams[i]->inseq){return streams[i];}
     }
+    // Send ping if it's been 20 seconds without any data, unless we already sent one
+    if(streams[i]->timestamp+20<now && !(streams[i]->state&STATE_PING))
+    {
+      streams[i]->state|=STATE_PING;
+      stream_send(streams[i], TYPE_PING, 0, 0, 0);
+    }
+    // Give up and consider it dead after 100 seconds more (2 minutes total)
+    else if(streams[i]->timestamp+120<now)
+    {
+      if(streams[i]->state&STATE_CLOSING) // Application already closed it
+      {
+        stream_free(streams[i]);
+      }else{
+        streams[i]->state|=STATE_CLOSED;
+        return streams[i];
+      }
+    }
   }
   return 0;
 }
 
 ssize_t udpstream_read(struct udpstream* stream, void* buf, size_t size)
 {
+  if(stream->state&(STATE_CLOSED|STATE_CLOSING)){return 0;} // EOF, TODO: -1 and EBADFD for STATE_CLOSING?
   // Check if it's any previously out of order packet's turn now
   unsigned int i;
   for(i=0; i<stream->recvpacketcount; ++i)
@@ -238,6 +357,7 @@ ssize_t udpstream_read(struct udpstream* stream, void* buf, size_t size)
 
 ssize_t udpstream_write(struct udpstream* stream, const void* buf, size_t size)
 {
+  if(stream->state&(STATE_CLOSED|STATE_CLOSING)){return 0;} // EOF, TODO: -1 and EBADFD for STATE_CLOSING?
 // TODO: abort and return negative if sentpacketcount is too high? EWOULDBLOCK?
   ++stream->sentpacketcount;
   stream->sentpackets=realloc(stream->sentpackets, sizeof(struct packet)*stream->sentpacketcount);
@@ -257,3 +377,14 @@ void udpstream_getaddr(struct udpstream* stream, struct sockaddr* addr, socklen_
 }
 
 int udpstream_getsocket(struct udpstream* stream){return stream->sock;}
+
+void udpstream_close(struct udpstream* stream)
+{
+  if(stream->state&STATE_CLOSED) // Closed by peer, just free it
+  {
+    stream_free(stream);
+  }else{
+    stream->state|=STATE_CLOSING;
+    stream_send(stream, TYPE_CLOSE, 0, 0, 0);
+  }
+}
diff --git a/udpstream.h b/udpstream.h
index bbc78e4..87ee173 100644
--- a/udpstream.h
+++ b/udpstream.h
@@ -36,4 +36,6 @@ extern ssize_t udpstream_write(struct udpstream* stream, const void* buf, size_t
 extern void udpstream_getaddr(struct udpstream* stream, struct sockaddr* addr, socklen_t* addrlen);
 
 extern int udpstream_getsocket(struct udpstream* stream);
+
+extern void udpstream_close(struct udpstream* stream);
 #endif
diff --git a/udptest.c b/udptest.c
index 1378b8a..be25c6f 100644
--- a/udptest.c
+++ b/udptest.c
@@ -68,9 +68,9 @@ int main(int argc, char** argv)
           uint32_t ip=addr.sin_addr.s_addr;
           printf("From: %u.%u.%u.%u:%hu:\n", ip%0x100, (ip/0x100)%0x100, (ip/0x10000)%0x100, ip/0x1000000, ntohs(addr.sin_port));
         }
-        stream=rstream;
         ssize_t len=udpstream_read(rstream, buf, 1024);
-        if(len<1){continue;}
+        if(len<1){udpstream_close(rstream); if(stream==rstream){stream=0;} continue;}
+        stream=rstream;
         write(1, buf, len);
       }
     }