Lab0.Warm Up

1 Networking by hand

这一个部分主要是体验一些基本的应用层协议,主要是HTTP协议和SMTP协议.

Both of these tasks rely on a networking abstraction called a reliable bidirectional in-order byte stream: you’ll type a sequence of bytes into the terminal, and the same sequence of bytes will eventually be delivered, in the same order, to a program running on another computer (a server). The server responds with its own sequence of bytes, delivered back to your terminal.

在实验资料中给出的是这么一段话,这句话的意思就是所有的应用层协议都是由底层支撑的,这个底层可以理解成可靠的二进制比特流的传输,一方应用程序会产生比特流投入到传输通道中,另一方的应用程序会从传输通道中获取到比特流信息.这个传输通道就是Socket,套接字.

2 自制Socket

This feature is known as a stream socket. To your program and to the Web server, the socket looks like an ordinary file descriptor (similar to a file on disk, or to the stdin or stdout I/O streams). When two stream sockets are connected, any bytes written to one socket will eventually come out in the same order from the other socket on the other computer.

Socket在Linux操作系统中本质上就是一个文件,一旦两个Socket相互连接,应用程序会往一个Socket递交数据,另外一个Socket就会原封不动地把数据传递过来.连接的方式在运输层有讲,客户端的一个网络端口创建一个Socket,往服务器的一个网络端口发送请求,这是第一次握手,接着服务器的网络端口传输ACK给客户端,这是第二次握手,接着客户端会传输一个最后的请求,这个叫三次握手.三次握手后,连接就完成了,这个时候两个Socket(可以理解成网络端口?)相互链接了.

需要注意的是,在应用层我们一般是注重逻辑通信,Socket是一个逻辑概念,应用程序把数据投给一个叫做Socket的东西,你可以理解成逻辑通信的一端,但是具体Socket往下是怎么做的不是应用程序需要关注的.

这个实验就需要我们模拟一个Socket应用,与一个服务器的端口建立连接.然后获取网页.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void get_URL(const string &host, const string &path) {
// Your code here.

// You will need to connect to the "http" service on
// the computer whose name is in the "host" string,
// then request the URL path given in the "path" string.

// Then you'll need to print out everything the server sends back,
// (not just one call to rearequestd() -- everything) until you reach
// the "eof" (end of file).
// create a TCPSocket
TCPSocket client_socket;
// connect with host. host is a parameter.
client_socket.connect(Address(host, "http"));
// send a request Message. the request is made of 2 sentences.
string request = "GET "+path+" HTTP/1.1\r\n"+"Host: "+host+"\r\nConnection: close\r\n\r\n";
client_socket.write(request);

// get the Message
while(!client_socket.eof()){
string reply = client_socket.read();
cout<<reply;
}
cerr << "Function called: get_URL(" << host << ", " << path << ").\n";
cerr << "Warning: get_URL() has not been implemented yet.\n";
}

这个时候先创建一个TCPSocket,首先先进行连接,然后像之前一样创建request,接着这个Socket就可以把request写进去.然后服务器会返回数据,这个数据是读取到Socket的,读数据一直读到EOF即可.

由于这个实验是面向初学者的,具体Socket怎么读怎么写我们没有考虑,我们只用调用教授已经写好的写,读操作.

3 缓冲区队列

要求实现一个有序字节流类(in-order byte stream),使之支持读写、容量控制。这个字节流类似于一个带容量的队列,从一头读,从另一头写。当流中的数据达到容量上限时,便无法再写入新的数据。特别的,写操作被分为了peek和pop两步。peek为从头部开始读取指定数量的字节,pop为弹出指定数量的字节。

总的来说就是做一个桶,可以从下方获得内容,也可以从上方添加内容,当桶满的时候就不可以添加东西了

ByteStream具有一定的容量,最大允许存储该容量大小的数据;在读取端读出一部分数据后,它会释放掉已经被读出的内容,以腾出空间继续让写端写入数据。

这个实验为我们后期实现TCP协议有着帮助.

上面的是缓冲区队列的一些声明,对于读写两方,操作是不同的.

有个小提示,如果C++的构造函数可以使用像这样的方法进行初始化的

1
2
3
class baba (const int abab) _abab(abab){

}

这个本质上就是数据结构题,完成缓冲区队列罢了.

Lab1.stitching substrings into a byte stream

该lab要求我们实现一个流重组类,可以将Sender发来的带索引号的字节碎片重组成有序的字节写入到byte_stream。接收端从发送端读取数据,调用流重组器,流重组器对数据进行排序,排序好后写入byte_stream。

流重组器也是有capasity的,也就是说流重组器也有一定的容量,一次性处理太多的信息会导致流重组器不能够正常地工作.同样的我们把流处理器当成一个双端队列即可.

private类中还有一个ByteStream类型的变量,所有的内容都输出给ByteStream,还有一个容量变量.其中ByteStream中的bytes_read返回ByteStream处理了多少元素.

因为重组类的函数中,支持的index是first unread=_output.bytes_read()(已经读取的元素)到first unacceptable的这一块区域,我们要保证输入的字节编号是在这个区域里面的.

在重组器类的函数中,push_substring函数完成重组工作。其参数有data(string),index(size_t),eof(bool),data是待排序的数据,index是数据首元素的序号,eof用于判断是否结束。结束后将不再有数据输入。

在重组的过程中,我们会遇到重复、丢包、重叠、乱序的现象。为了使乱序的数据找到有序的位置,我使用’\0’维护重组器中数据的相对序号,例如,第一次data为make,index为0,第二次data为great,index为13,而处于两组数据中间的数据未知,我们就用’\0’代替,即make\0\0\0\0\0\0\0\0\0great。这样就维护了已经进入重组器的数据的有序。当然,写入的data中也有可能含有\0,这是,我们就需要一个bool双端队列,来记录相应位置的数据是否有序,在上述例子中,队列的bool值为111100000000011111。

所以说我们在数据结构中添加几项,一个是_unassembled_byte,是一个std::deque,暂时存储还乱序的字符串,_check_byte是std::deque,这个元素与_unassembled_byte一一对应,当un[i]存储着还没有发送的字符的时候,ch[I]=true,否则为false,还有一个_lens_un,这个记录乱序的字符的长度.

程序的总体结构:

发送端的数据->流重组器(重组成有序的数据)->Bytestream(在Lab0就做好的队列)->TCP接收端.
流重组器需要做的是,把所有有序的数据写入到接收端.
其中字符的编号是从1一直往后延伸的,因为队列的首和尾都可以记录.TCP的发送端发送的数据也是(字符号、字符串)字符的编号一直往后延伸.

这个时候我们回忆一下对应数据的表示:

output.bytes_read():接收端从ByteStream获得的字符数量.

output.bytes_write():流重组器写入ByteStream的字符数量-1.而且是流重组器的有效数据中index最小的序号

_lens_un指的还在流重组器里面的数据的长度.

其中:output.bytes_read()+_capacity是ByteStream可以接受的范围,output.bytes_write()+_lens_un是流重组器的有效数据中index最大的序号.

1.我们判断输入序号是否大于内存与已读取数据之和,也就是说,该数据是否属于unacceptable中的数据,如果是这样的数据,我们没有足够的内存写入,因为写入这样的数据需要添加\0,从而超过capasity的大小。代码如下:

1
2
3
if(index>_output.bytes_read()+_capacity){  
return;
}

2.字符串部分在区域内,但是部分在区域外,那就把区域外的内容舍弃,只读取区域内的内容.

我们需要判断data中最后一个数据的序号是否大于内存与已读取数据之和,如果大于,我们就要将能写入的部分写入,也就是按data的顺序尽可能地写入数据而不超过capasity,在写入的过程中,我们也会遇到两种情况,一种是序号index大于此时已经在流重组器的最后一个数据的序号,在这种情况下我们要在流重组器最后一个序号与index之间填入’\0′,同时将相应的bool双端队列(_check_byte)设置为false,做完这些工作后,才开始写入新的数据。另一种情况是index的小于或者等于流重组器最后一个数据的序号,我们需要弹出冲突的数据,举个例子就是,index序号为5,此时流重组器中的数据为stanford,我们就要从序号5的数据也就是o开始弹出,变成stanf,再写入data中的数据。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if(index+data.length()>_capacity+_output.bytes_read()){
for(size_t i=_lens_un+_output.bytes_written();i<_capacity+_output.bytes_read();i++){
if(i<index){
_unassembled_byte.push_back('\\0');
_check_byte.push_back(false);

}else{
_unassembled_byte.push_back(data[i-index]);
_check_byte.push_back(true);

}
_lens_un++;
}
}

3.我们要判断index是否等于已经写入byte_stream(_output)中的数据,如果是的,我们就直接将data中的数据写入byte_stream,然后在重组器中弹出data.length()个数据,值得注意的是,当重组器中的数据个数小于data.length(),我们就全部弹出。但是后面的数据会被当成无效数据而不进行处理,代码如下:

1
2
3
4
5
6
7
8
if(index==_output.bytes_written()){
//直接写
_output.write(data);
size_t temp_len=std::min(_lens_un,data.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
}

4.我们要判断index是否大于流重组器中的最后一个数据的序号和写入byte_stream中的数据个数之和,如果大于,我们就可以参考1的处理,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if(index>_output.bytes_written()+_lens_un){
for(size_t i=_output.bytes_written()+_lens_un;i<index;i++){
_unassembled_byte.push_back('\\0');
_check_byte.push_back(false);
_lens_un++;
}
[原来的data][空][新的data]
for(char i : data){
_unassembled_byte.push_back(i);
_lens_un++;
_check_byte.push_back(true);
}
}

5.我们要判断data中的数据是否已经被写入byte_stream,这个说法有些不准确,准确的说是相应序号的数据被写入,如果data中的所有数据都被写入了byte_stream,我们就直接返回,如果只是部分被写入,我们就将data中未被写入的部分写入。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
if(index<_output.bytes_written()){
if(_output.bytes_written()>index+data.length()){
return;
}
//[已经写入Byte_stream的][bytes_written()][新传来的data在bytes_written()之后的,入队][原来在_output.bytes_written()+_lens_un之后的data]
//还是要写,一直写到data最后.
std::string data_cut(data.begin()+_output.bytes_written()-index,data.end());
_output.write(data_cut);
size_t temp_len=std::min(_lens_un,data_cut.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;

6.不是任何情况:首先我们知道要把_output.bytes_written()~index这一部分的内容保存好,然后再把data加入进去即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//在中间插入元素
//先弹出一部分数据保存到栈中
std::stack<char> temp;
std::stack<bool> temp_check;
for(size_t i=0;i<index-_output.bytes_written();i++){
temp.push(_unassembled_byte.at(i));
temp_check.push(_check_byte.at(i));
}
[原data,入队][index][新传来的data,入队][原来在_output.bytes_written()+_lens_un之后的data]
//这里是看数据的最后一个index有没有达到_output.bytes_written()+_lens_un,达到的话后面的内容要保留,没达到就全部删除即可
size_t temp_len=std::min(_lens_un,data.length()+index-_output.bytes_written());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
for(int i=data.length()-1;i>=0;i--){
_unassembled_byte.push_front(data[i]);
_check_byte.push_front(true);
_lens_un++;
}
while(!temp.empty()){
_unassembled_byte.push_front(temp.top());
_check_byte.push_front(temp_check.top());
_lens_un++;
temp.pop();
temp_check.pop();
}

7.输入字符串到ByteStream中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
size_t i=0;
while(i<_lens_un){
if(!_check_byte.at(i)){
break;
}
i++;
}
std::string n(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_output.write(n);
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_lens_un-=i;
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+i);
if(eof) input_end_index=index+data.length();
if(input_end_index==_output.bytes_written()) _output.end_input();

Lab2.TCP Reciever

绝对序号和相对序号的转换:

在实践中,一个分组的序号承载在分组首部的一个固定长度的字段中。如果分组序号字段的比特数是k,则该序号范围是。 在一个有限的序号范围内,所有涉及序号的运算必须使用模运算。(即序号空间可被看作是一个长度为 的环,其中序号紧挨着0)。上面论述的序号是相对序号(相对序号的开始值是),还有一种不模的运算就是绝对序号.

这个时候我们需要完成两个函数:

1.wrap(绝对序号转化为相对序号)

1
2
3
4
5
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
DUMMY_CODE(n, isn);
WrappingInt32 res(n+isn.raw_value());
return res;
}

这个函数调用了WrappingInt32类的构造函数,构造函数获得一个int类型的数(uint_64等类型)然后取模之后获得32位的整形数,存放到raw_value成员中.

2.unwrap(相对序号转绝对序号)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
DUMMY_CODE(n, isn, checkpoint);
uint64_t temp=n.raw_value()-isn.raw_value();
if(checkpoint==0){
return temp;
}
uint32_t div=checkpoint/(1ul<<32);
uint32_t res=checkpoint%(1ul<<32);
if (res<=temp) {
temp=(checkpoint-temp-(div-1)\*(1ul<<32))<(temp+div\*(1ul<<32)-checkpoint)?temp+(div-1)\*(1ul<<32):temp+div\*(1ul<<32);
}else{
temp=(checkpoint-temp-div\*(1ul<<32))<(temp+(div+1)\*(1ul<<32)-checkpoint)?temp+div\*(1ul<<32):temp+(div+1)\*(1ul<<32);
}
return temp;
}

给定checkpoint,找到最靠近checkpoint的那个temp,返回即可.

Implementing the TCP receiver

首先我们看一看TCP报文包的定义:主要是由首部和其中的元素组成:其中可以调用serialize和parse方法转化,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TCPSegment {
private:
TCPHeader _header{};
Buffer _payload{};

public:
//! \\brief Parse the segment from a string
ParseResult parse(const Buffer buffer, const uint32_t datagram_layer_checksum = 0);

//! \\brief Serialize the segment to a string
BufferList serialize(const uint32_t datagram_layer_checksum = 0) const;

//! \\name Accessors
//!@{
const TCPHeader &header() const { return _header; }
TCPHeader &header() { return _header; }

const Buffer &payload() const { return _payload; }
Buffer &payload() { return _payload; }
//!@}

//! \\brief Segment's length in sequence space
//! \\note Equal to payload length plus one byte if SYN is set, plus one byte if FIN is set
size_t length_in_sequence_space() const;
};

接着我们来看一看TCP首部:首部的元素主要是:

  • 序号:seqno,占32位,用来标识从发送端到接收端的字节流;
  • 确认号:ackno,占32位,只有ACK标志位为1时,确认号才有效,ackno=seqno+1;
  • 标志位:
    • SYN:发起一个连接;
    • FIN:释放一个连接;
    • ACK:确认序号有效。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      struct TCPHeader {
      static constexpr size_t LENGTH = 20; //!< [TCP](\\ref rfc::rfc793) header length, not including options

      //! \\struct TCPHeader
      //! ~~~{.txt}
      //! 0 1 2 3
      //! 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Source Port Destination Port
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Sequence Number
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Acknowledgment Number
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Data UAPRSF
      //! Offset Reserved RCSSYI Window
      //! GKHTNN
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Checksum Urgent Pointer
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! Options Padding
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! data
      //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
      //! ~~~

      //! \\name TCP Header fields
      //!@{
      uint16_t sport = 0; //!< source port
      uint16_t dport = 0; //!< destination port
      WrappingInt32 seqno{0}; //!< sequence number
      WrappingInt32 ackno{0}; //!< ack number
      uint8_t doff = LENGTH / 4; //!< data offset
      bool urg = false; //!< urgent flag
      bool ack = false; //!< ack flag
      bool psh = false; //!< push flag
      bool rst = false; //!< rst flag
      bool syn = false; //!< syn flag
      bool fin = false; //!< fin flag
      uint16_t win = 0; //!< window size
      uint16_t cksum = 0; //!< checksum
      uint16_t uptr = 0; //!< urgent pointer
      }
      接着看一看TCP receiver的数据结构定义:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      #ifndef SPONGE_LIBSPONGE_TCP_RECEIVER_HH
      #define SPONGE_LIBSPONGE_TCP_RECEIVER_HH

      #include "byte_stream.hh"
      #include "stream_reassembler.hh"
      #include "tcp_segment.hh"
      #include "wrapping_integers.hh"

      #include <optional>

      //! \\brief The "receiver" part of a TCP implementation.

      //! Receives and reassembles segments into a ByteStream, and computes
      //! the acknowledgment number and window size to advertise back to the
      //! remote TCPSender.
      //接收重组segments为 ByteStream,并计算确认号和窗口大小以通告回远程 TCPSender。
      class TCPReceiver {
      //! Our data structure for re-assembling bytes.
      //我们用于重新组装字节的数据结构。
      StreamReassembler _reassembler;

      //! The maximum number of bytes we'll store.
      //容量大小
      size_t _capacity;
      WrappingInt32 ISN;
      bool syn_flag;
      public:

      //! \\brief Construct a TCP receiver
      //!
      //! \\param capacity the maximum number of bytes that the receiver will
      //! store in its buffers at any give time.
      //构造函数,构造一个 TCP 接收器,容量接收器在任何给定时间将存储在其缓冲区中的最大字节数。
      TCPReceiver(const size_t capacity) : _reassembler(capacity), _capacity(capacity),ISN(0) ,syn_flag(0){}

      //! \\name Accessors to provide feedback to the remote TCPSender
      //!@{

      //! \\brief The ackno that should be sent to the peer
      //! \\returns empty if no SYN has been received
      //!
      //! This is the beginning of the receiver's window, or in other words, the sequence number
      //! of the first byte in the stream that the receiver hasn't received.
      // 如果没有收到 SYN,则应发送给对等方的 ackno 为空
      //这是接收器窗口的开始,否则,接收器未接收到的流中第一个字节的序列号。
      std::optional<WrappingInt32> ackno() const;

      //! \\brief The window size that should be sent to the peer
      //!
      //! Operationally: the capacity minus the number of bytes that the
      //! TCPReceiver is holding in its byte stream (those that have been
      //! reassembled, but not consumed).
      //!
      //! Formally: the difference between (a) the sequence number of
      //! the first byte that falls after the window (and will not be
      //! accepted by the receiver) and (b) the sequence number of the
      //! beginning of the window (the ackno).
      size_t window_size() const;
      //!@}

      //! \\brief number of bytes stored but not yet reassembled
      size_t unassembled_bytes() const { return _reassembler.unassembled_bytes(); }

      //! \\brief handle an inbound segment
      void segment_received(const TCPSegment &seg);

      //! \\name "Output" interface for the reader
      //!@{
      ByteStream &stream_out() { return _reassembler.stream_out(); }
      const ByteStream &stream_out() const { return _reassembler.stream_out(); }
      bool recv_fin() const;
      //!@}
      };
      我们知道TCP需要接受一个叫做segment类型的数据,然后存储起来,送入到Lab1已经实现好的reassemble_stream中.并返回适合的ACK.

对于接受的数据:分成两种可能,一种是第一个序列,另外的就是普通的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void TCPReceiver::segment_received(const TCPSegment &seg) {
DUMMY_CODE(seg);
//代表第一个传过来的seg
if(seg.header().syn){
syn_flag= true;
//窗口的左端
ISN=seg.header().seqno;
} else if(!syn_flag){
return;
}
//推断数据包的序号,序号比较靠近上一个已经接收到的序号,然后塞进我们在Lab1已经写好的流重组器.
uint64_t received_lens=_reassembler.stream_out().bytes_written();
size_t index= unwrap(seg.header().seqno,ISN,received_lens);
if(!seg.header().syn){
index--;
}
//进行重组
_reassembler.push_substring(seg.payload().copy(),index,seg.header().fin);
}

ACK的返回也很简单,流重组器输入到Byte stream的个数就代表已经输入了多少个有序的序列,返回对应的ACK即可.但是对于结束的时候的ACK回应,我们还是需要分类讨论.

1
2
3
4
5
6
7
8
9
10
11
12
13
optional<WrappingInt32> TCPReceiver::ackno() const {
if(!syn_flag){
return std::nullopt;
}else{
//判断是否是最后一个
if(_reassembler.stream_out().input_ended()){
return ISN+_reassembler.stream_out().bytes_written()+2;
}else{
//返回的ACK的序号就是期望获得的下一个字符的数+1,流重组器的已连续写入的数据量就是最后一个有序的 //字符
return ISN+_reassembler.stream_out().bytes_written()+1;
}
}
}

Lab3 TCP Sender

这一次我们要实现TCP的发送方,这一次我把必要的注释写在代码里面了.

1.头文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
uint64_t base{0};
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//cached TCPSegment.
std::queue<TCPSegment> _segments_out_cached{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;

//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//nextseq numbers as the absolute TCP number.
uint64_t _next_seqnum{0};
//slide windows size
uint16_t _curr_window_size;
//isfinished?
bool _isfin;
size_t _times;
//ticking?
bool _time_waiting;
//remission times.
int _consecutive_remission;
// when is time out?
size_t _time_out;
//empty windows?
bool _window_zero;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};

public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});

//! \\name "Input" interface for the writer
//!@{
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
//!@}

//! \\name Methods that can cause the TCPSender to send a segment
//!@{

//! \\brief A new acknowledgment was received
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);

//! \\brief Generate an empty-payload segment (useful for creating empty ACK segments)
void send_empty_segment();

//! \\brief create and send segments to fill as much of the window as possible
void fill_window();

//! \\brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick);
//!@}

//! \\name Accessors
//!@{

//! \\brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
//! \\note count is in "sequence space," i.e. SYN and FIN each count for one byte
//! (see TCPSegment::length_in_sequence_space())
size_t bytes_in_flight() const;

//! \\brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions() const;

//! \\brief TCPSegments that the TCPSender has enqueued for transmission.
//! \\note These must be dequeued and sent by the TCPConnection,
//! which will need to fill in the fields that are set by the TCPReceiver
//! (ackno and window size) before sending.
std::queue<TCPSegment> &segments_out() { return _segments_out; }
//!@}

//! \\name What is the next sequence number? (used for testing)
//!@{

//! \\brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute() const { return _next_seqno; }

//! \\brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
//!@}
};

2.发送数据函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
void TCPSender::fill_window() {
// windows is full or the programe is finished.
if(_curr_window_size==0_isfin){
return;
}
//haven't send any bytes.
if(_next_seqno==0){
TCPSegment seg;
// the TCP transmission start from _isn.
seg.header().seqno = _isn;
seg.header().syn = true;
// the TCP first connection just send 1 bytes;
_next_seqno = 1;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//the end of the file
else if(_stream.eof()){
//set the finish flag to true;
_isfin = true;
TCPSegment seg;
seg.header().syn=false;
seg.header().fin=true;
//convert the absolute TCP number to TCP number.
seg.header().seqno = wrap(_next_seqno,_isn);
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//normal file
else{
//make sure the windows is not full and there's any data to convert.
while(!_stream.buffer_empty()&&_curr_window_size>0){
//decide the length of the TCP Segment.
//make sure the length of TCP segment is below the silde windows size and data length.
uint64_t lens_byte=std::min(_stream.buffer_size(),uint64_t (_curr_window_size));
lens_byte=std::min(lens_byte,TCPConfig::MAX_PAYLOAD_SIZE);
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno,_isn);
seg.header().syn = false;
//get the lens_byte data to the payload.
seg.payload()=_stream.read(lens_byte);
// increase the next seq_no;
_next_seqno += lens_byte;
_curr_window_size -= lens_byte;
// get the end of the file.
if(_stream.eof()&&_curr_window_size>0){
_isfin = true;
seg.header().fin=true;
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
}
_segments_out.push(seg);
_segments_out_cached.push(seg);
if(_isfin){
break;
}
}
}
//start ticking...
if(!_time_waiting){
_time_out = _initial_retransmission_timeout;
_time_waiting = true;
_times = 0;
}
}

3.接受ACK:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//! \\param ackno The remote receiver's ackno (acknowledgment number)
//! \\param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
DUMMY_CODE(ackno, window_size);
// get the absolute TCP number of ACK...
uint64_t acknos = unwrap(ackno,_isn,base);
//thrid connection...
//means the 0th bytes gets and desire to 1st bytes...
if(base==0&&acknos==1){
base=1;
_segments_out_cached.pop();
_consecutive_remission=0;
}
else if(acknos > _next_seqno){
return;
}
//the ack number is bigger than first cached segment...
//means the cached data gets by the reciever...
else if(!_segments_out_cached.empty() && acknos >= base + _segments_out_cached.front().length_in_sequence_space()){
//first segment in cache, and get the seqno and length of the segment...
uint64_t copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
uint64_t copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
//find the segments that acked by recevier...
//hint:if seqno+len<=ackno:means the data is acked by recevier...
while(copy_seg_len+copy_seg_seqno<=acknos){
//move the base, base is the 1st bytes that nor acked...
base += _segments_out_cached.front().length_in_sequence_space();
_segments_out_cached.pop();
if(_segments_out_cached.empty()) break;
// judge the 2nd segs...
copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
}
_time_out = _initial_retransmission_timeout;
_times = 0;
_consecutive_remission = 0;
}
// 3rd disconnection.
else if(acknos == _next_seqno && _isfin){
base = acknos;
_segments_out_cached.pop();
}
//the windows is empty
if(_next_seqno-base==0){
_time_waiting = false;
}
// 流量控制,发送方窗口不大于接受方窗口
else if(_next_seqno-base>=window_size){
_curr_window_size = 0;
return;
}
if(window_size==0){
_curr_window_size = 1;
_window_zero = true;
}
else{
_curr_window_size = window_size;
_window_zero = false;
_consecutive_remission = 0;
}
fill_window();
}

4. 构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//! \\param[in] capacity the capacity of the outgoing byte stream
//! \\param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \\param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, base(0)
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _curr_window_size(1)
, _isfin(false)
, _times(0)
, _time_waiting(false)
, _consecutive_remission(0)
, _time_out(0)
, _window_zero(false)
{

}

5.超时处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//! \\param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
DUMMY_CODE(ms_since_last_tick);
// the times pased by
_times += ms_since_last_tick;
//timeout and non-empty cache. resend...
if(!_segments_out_cached.empty()&&_time_waiting&&_times>=_time_out){
//resend..
_segments_out.push(_segments_out_cached.front());
// increase the time out times...
if(!_window_zero){
//add remissions
_consecutive_remission++;
_time_out\*=2;
_time_waiting = true;
}
_times=0;
}
}

Lab4 TCP Connnection

在这里我们需要实现一个TCP连接类,在这个TCP连接类里面,我们需要组合之前已经写好的TCP发送端和接收端的函数来进行处理.

1、接受到segment的操作:

分成两种操作,一种是正常的交互,一种是握手的操作.握手又分成主动请求链接和被动链接,在这两种模式下接受握手信息的处理是不一样的.对于正常的交互,需要交付给Sender和Reciever.因为对于TCP来说,两者是相互统一的.两个主机之间也会互相传递信息,所以说交给发送端处理ACK,交给接收端返回给上层.实际的TCP协议并不是完全的类似于GBN和SR,具体的差异就在ACK的数据是相互传递的,换句话说就是连着兼有.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void TCPConnection::segment_received(const TCPSegment &seg) { 
DUMMY_CODE(seg);
// get the segment from IP level;
if(!_active){
return;
}
_time_since_last_segment_received=0;
//not get segment no send ACK.
//passive connection...
//the ackno is null and no bytes is sent
if(!_receiver.ackno().has_value()&&_sender.next_seqno_absolute()==0){
//only recieve syn...
if(!seg.header().syn) return;
//as the Service side,passive connected.
_receiver.segment_received(seg);
//it's OK to connect.
connect();
return;
}
// active connected..
// first connection...
if(_sender.next_seqno_absolute() > 0 && _sender.bytes_in_flight() == _sender.next_seqno_absolute() &&
!_receiver.ackno().has_value()){
// the length of payload is not 0
if(seg.payload().size() ){
return;
}
// if ack is no
// the twoside wants to setup the connection at the same time.
if(!seg.header().ack){
if(seg.header().syn){
_receiver.segment_received(seg);
// send empty ack to setup the connection.
_sender.send_empty_segment();
}
return;
}
// ifsyn=1,ack=1,rst=1,then shut down.
if(seg.header().rst){
_receiver.stream_out().set_error();
_sender.stream_in().set_error();
_active = false;
return;
}
}
//otherwise...
//recieve the segment
_receiver.segment_received(seg);
_sender.ack_received(seg.header().ackno,seg.header().win);
// thrid connection
if (_sender.stream_in().buffer_empty() && seg.length_in_sequence_space())
_sender.send_empty_segment();
if (seg.header().rst) {
_sender.send_empty_segment();
unclean_shutdown();
return;
}
send_sender_segments();
}

2、写seg.

将上层应用的数据写入到Bytestream中,提醒发送方发送.

1
2
3
4
5
6
7
8
9
10
11
size_t TCPConnection::write(const string &data) {
DUMMY_CODE(data);
// get the OS data... ready to be sent by TCP
if(data.size()==0){
return 0;
}
size_t write_size = _sender.stream_in().write(data);
_sender.fill_window();
send_sender_segments();
return write_size;
}

3、时钟(操作系统不定期调用之)

提醒Sender处理时间,看看是不是超时了.记录一下time_since_last_segment_received.

1
2
3
4
5
6
7
8
9
10
11
12
13
//! \\param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
DUMMY_CODE(ms_since_last_tick);
if(!_active) return;
//count
_time_since_last_segment_received += ms_since_last_tick;
// tell the sender to tick
_sender.tick(ms_since_last_tick);
if(_sender.consecutive_retransmissions()>TCPConfig::MAX_RETX_ATTEMPTS){
unclean_shutdown();
}
send_sender_segments();
}

4、真正的发送信息:读取sender中的消息缓存,然后加上ack和窗口信息信息,发送出去.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TCPConnection::send_sender_segments (){
//travel the queue to set the ack and windows size.
while(!_sender.segments_out().empty()){
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
// the ack number is bot null
if(_receiver.ackno().has_value()){
seg.header().ack=true;
seg.header().ackno=_receiver.ackno().value();
seg.header().win=_receiver.window_size();
}
_segments_out.push(seg);
}
//every time send segment,we need to shutdown.
clean_shutdown();
}