Both of these tasks rely on a networking abstraction called a reliable bidirectional in-order byte stream: you’ll type a sequence of bytes into the terminal, and the same sequence of bytes will eventually be delivered, in the same order, to a program running on another computer (a server). The server responds with its own sequence of bytes, delivered back to your terminal.
This feature is known as a stream socket. To your program and to the Web server, the socket looks like an ordinary file descriptor (similar to a file on disk, or to the stdin or stdout I/O streams). When two stream sockets are connected, any bytes written to one socket will eventually come out in the same order from the other socket on the other computer.
voidget_URL(conststring &host, conststring &path) { // Your code here.
// You will need to connect to the "http" service on // the computer whose name is in the "host" string, // then request the URL path given in the "path" string.
// Then you'll need to print out everything the server sends back, // (not just one call to rearequestd() -- everything) until you reach // the "eof" (end of file). // create a TCPSocket TCPSocket client_socket; // connect with host. host is a parameter. client_socket.connect(Address(host, "http")); // send a request Message. the request is made of 2 sentences. string request = "GET "+path+" HTTP/1.1\r\n"+"Host: "+host+"\r\nConnection: close\r\n\r\n"; client_socket.write(request);
// get the Message while(!client_socket.eof()){ string reply = client_socket.read(); cout<<reply; } cerr << "Function called: get_URL(" << host << ", " << path << ").\n"; cerr << "Warning: get_URL() has not been implemented yet.\n"; }
//! \\brief Segment's length in sequence space //! \\note Equal to payload length plus one byte if SYN is set, plus one byte if FIN is set size_t length_in_sequence_space() const; };
//! \\brief The "receiver" part of a TCP implementation.
//! Receives and reassembles segments into a ByteStream, and computes //! the acknowledgment number and window size to advertise back to the //! remote TCPSender. //接收重组segments为 ByteStream,并计算确认号和窗口大小以通告回远程 TCPSender。 class TCPReceiver { //! Our data structure for re-assembling bytes. //我们用于重新组装字节的数据结构。 StreamReassembler _reassembler;
//! The maximum number of bytes we'll store. //容量大小 size_t _capacity; WrappingInt32 ISN; bool syn_flag; public:
//! \\brief Construct a TCP receiver //! //! \\param capacity the maximum number of bytes that the receiver will //! store in its buffers at any give time. //构造函数,构造一个 TCP 接收器,容量接收器在任何给定时间将存储在其缓冲区中的最大字节数。 TCPReceiver(const size_t capacity) : _reassembler(capacity), _capacity(capacity),ISN(0) ,syn_flag(0){}
//! \\name Accessors to provide feedback to the remote TCPSender //!@{
//! \\brief The ackno that should be sent to the peer //! \\returns empty if no SYN has been received //! //! This is the beginning of the receiver's window, or in other words, the sequence number //! of the first byte in the stream that the receiver hasn't received. // 如果没有收到 SYN,则应发送给对等方的 ackno 为空 //这是接收器窗口的开始,否则,接收器未接收到的流中第一个字节的序列号。 std::optional<WrappingInt32> ackno() const;
//! \\brief The window size that should be sent to the peer //! //! Operationally: the capacity minus the number of bytes that the //! TCPReceiver is holding in its byte stream (those that have been //! reassembled, but not consumed). //! //! Formally: the difference between (a) the sequence number of //! the first byte that falls after the window (and will not be //! accepted by the receiver) and (b) the sequence number of the //! beginning of the window (the ackno). size_t window_size() const; //!@}
//! \\brief number of bytes stored but not yet reassembled size_t unassembled_bytes() const { return _reassembler.unassembled_bytes(); }
//! \\brief handle an inbound segment void segment_received(const TCPSegment &seg);
class TCPSender { private: //! our initial sequence number, the number for our SYN. WrappingInt32 _isn; uint64_t base{0}; //! outbound queue of segments that the TCPSender wants sent std::queue<TCPSegment> _segments_out{}; //cached TCPSegment. std::queue<TCPSegment> _segments_out_cached{}; //! retransmission timer for the connection unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent ByteStream _stream; //nextseq numbers as the absolute TCP number. uint64_t _next_seqnum{0}; //slide windows size uint16_t _curr_window_size; //isfinished? bool _isfin; size_t _times; //ticking? bool _time_waiting; //remission times. int _consecutive_remission; // when is time out? size_t _time_out; //empty windows? bool _window_zero; //! the (absolute) sequence number for the next byte to be sent uint64_t _next_seqno{0};
//! \\name Methods that can cause the TCPSender to send a segment //!@{
//! \\brief A new acknowledgment was received void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
//! \\brief Generate an empty-payload segment (useful for creating empty ACK segments) void send_empty_segment();
//! \\brief create and send segments to fill as much of the window as possible void fill_window();
//! \\brief Notifies the TCPSender of the passage of time void tick(const size_t ms_since_last_tick); //!@}
//! \\name Accessors //!@{
//! \\brief How many sequence numbers are occupied by segments sent but not yet acknowledged? //! \\note count is in "sequence space," i.e. SYN and FIN each count for one byte //! (see TCPSegment::length_in_sequence_space()) size_t bytes_in_flight() const;
//! \\brief Number of consecutive retransmissions that have occurred in a row unsigned int consecutive_retransmissions() const;
//! \\brief TCPSegments that the TCPSender has enqueued for transmission. //! \\note These must be dequeued and sent by the TCPConnection, //! which will need to fill in the fields that are set by the TCPReceiver //! (ackno and window size) before sending. std::queue<TCPSegment> &segments_out() { return _segments_out; } //!@}
//! \\name What is the next sequence number? (used for testing) //!@{
//! \\brief absolute seqno for the next byte to be sent uint64_t next_seqno_absolute() const { return _next_seqno; }
//! \\brief relative seqno for the next byte to be sent WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); } //!@} };
void TCPSender::fill_window() { // windows is full or the programe is finished. if(_curr_window_size==0_isfin){ return; } //haven't send any bytes. if(_next_seqno==0){ TCPSegment seg; // the TCP transmission start from _isn. seg.header().seqno = _isn; seg.header().syn = true; // the TCP first connection just send 1 bytes; _next_seqno = 1; _curr_window_size--; _segments_out.push(seg); _segments_out_cached.push(seg); } //the end of the file else if(_stream.eof()){ //set the finish flag to true; _isfin = true; TCPSegment seg; seg.header().syn=false; seg.header().fin=true; //convert the absolute TCP number to TCP number. seg.header().seqno = wrap(_next_seqno,_isn); //the fin packet only send a byte. _next_seqno++; _curr_window_size--; _segments_out.push(seg); _segments_out_cached.push(seg); } //normal file else{ //make sure the windows is not full and there's any data to convert. while(!_stream.buffer_empty()&&_curr_window_size>0){ //decide the length of the TCP Segment. //make sure the length of TCP segment is below the silde windows size and data length. uint64_t lens_byte=std::min(_stream.buffer_size(),uint64_t (_curr_window_size)); lens_byte=std::min(lens_byte,TCPConfig::MAX_PAYLOAD_SIZE); TCPSegment seg; seg.header().seqno = wrap(_next_seqno,_isn); seg.header().syn = false; //get the lens_byte data to the payload. seg.payload()=_stream.read(lens_byte); // increase the next seq_no; _next_seqno += lens_byte; _curr_window_size -= lens_byte; // get the end of the file. if(_stream.eof()&&_curr_window_size>0){ _isfin = true; seg.header().fin=true; //the fin packet only send a byte. _next_seqno++; _curr_window_size--; } _segments_out.push(seg); _segments_out_cached.push(seg); if(_isfin){ break; } } } //start ticking... if(!_time_waiting){ _time_out = _initial_retransmission_timeout; _time_waiting = true; _times = 0; } }
//! \\param ackno The remote receiver's ackno (acknowledgment number) //! \\param window_size The remote receiver's advertised window size void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { DUMMY_CODE(ackno, window_size); // get the absolute TCP number of ACK... uint64_t acknos = unwrap(ackno,_isn,base); //thrid connection... //means the 0th bytes gets and desire to 1st bytes... if(base==0&&acknos==1){ base=1; _segments_out_cached.pop(); _consecutive_remission=0; } else if(acknos > _next_seqno){ return; } //the ack number is bigger than first cached segment... //means the cached data gets by the reciever... else if(!_segments_out_cached.empty() && acknos >= base + _segments_out_cached.front().length_in_sequence_space()){ //first segment in cache, and get the seqno and length of the segment... uint64_t copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base); uint64_t copy_seg_len = _segments_out_cached.front().length_in_sequence_space(); //find the segments that acked by recevier... //hint:if seqno+len<=ackno:means the data is acked by recevier... while(copy_seg_len+copy_seg_seqno<=acknos){ //move the base, base is the 1st bytes that nor acked... base += _segments_out_cached.front().length_in_sequence_space(); _segments_out_cached.pop(); if(_segments_out_cached.empty()) break; // judge the 2nd segs... copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base); copy_seg_len = _segments_out_cached.front().length_in_sequence_space(); } _time_out = _initial_retransmission_timeout; _times = 0; _consecutive_remission = 0; } // 3rd disconnection. else if(acknos == _next_seqno && _isfin){ base = acknos; _segments_out_cached.pop(); } //the windows is empty if(_next_seqno-base==0){ _time_waiting = false; } // 流量控制,发送方窗口不大于接受方窗口 else if(_next_seqno-base>=window_size){ _curr_window_size = 0; return; } if(window_size==0){ _curr_window_size = 1; _window_zero = true; } else{ _curr_window_size = window_size; _window_zero = false; _consecutive_remission = 0; } fill_window(); }
4. 构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
//! \\param[in] capacity the capacity of the outgoing byte stream //! \\param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment //! \\param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN) TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , base(0) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) , _curr_window_size(1) , _isfin(false) , _times(0) , _time_waiting(false) , _consecutive_remission(0) , _time_out(0) , _window_zero(false) {
}
5.超时处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//! \\param[in] ms_since_last_tick the number of milliseconds since the last call to this method void TCPSender::tick(const size_t ms_since_last_tick) { DUMMY_CODE(ms_since_last_tick); // the times pased by _times += ms_since_last_tick; //timeout and non-empty cache. resend... if(!_segments_out_cached.empty()&&_time_waiting&&_times>=_time_out){ //resend.. _segments_out.push(_segments_out_cached.front()); // increase the time out times... if(!_window_zero){ //add remissions _consecutive_remission++; _time_out\*=2; _time_waiting = true; } _times=0; } }
void TCPConnection::segment_received(const TCPSegment &seg) { DUMMY_CODE(seg); // get the segment from IP level; if(!_active){ return; } _time_since_last_segment_received=0; //not get segment no send ACK. //passive connection... //the ackno is null and no bytes is sent if(!_receiver.ackno().has_value()&&_sender.next_seqno_absolute()==0){ //only recieve syn... if(!seg.header().syn) return; //as the Service side,passive connected. _receiver.segment_received(seg); //it's OK to connect. connect(); return; } // active connected.. // first connection... if(_sender.next_seqno_absolute() > 0 && _sender.bytes_in_flight() == _sender.next_seqno_absolute() && !_receiver.ackno().has_value()){ // the length of payload is not 0 if(seg.payload().size() ){ return; } // if ack is no // the twoside wants to setup the connection at the same time. if(!seg.header().ack){ if(seg.header().syn){ _receiver.segment_received(seg); // send empty ack to setup the connection. _sender.send_empty_segment(); } return; } // ifsyn=1,ack=1,rst=1,then shut down. if(seg.header().rst){ _receiver.stream_out().set_error(); _sender.stream_in().set_error(); _active = false; return; } } //otherwise... //recieve the segment _receiver.segment_received(seg); _sender.ack_received(seg.header().ackno,seg.header().win); // thrid connection if (_sender.stream_in().buffer_empty() && seg.length_in_sequence_space()) _sender.send_empty_segment(); if (seg.header().rst) { _sender.send_empty_segment(); unclean_shutdown(); return; } send_sender_segments(); }
2、写seg.
将上层应用的数据写入到Bytestream中,提醒发送方发送.
1 2 3 4 5 6 7 8 9 10 11
size_t TCPConnection::write(const string &data) { DUMMY_CODE(data); // get the OS data... ready to be sent by TCP if(data.size()==0){ return 0; } size_t write_size = _sender.stream_in().write(data); _sender.fill_window(); send_sender_segments(); return write_size; }
//! \\param[in] ms_since_last_tick number of milliseconds since the last call to this method void TCPConnection::tick(const size_t ms_since_last_tick) { DUMMY_CODE(ms_since_last_tick); if(!_active) return; //count _time_since_last_segment_received += ms_since_last_tick; // tell the sender to tick _sender.tick(ms_since_last_tick); if(_sender.consecutive_retransmissions()>TCPConfig::MAX_RETX_ATTEMPTS){ unclean_shutdown(); } send_sender_segments(); }
4、真正的发送信息:读取sender中的消息缓存,然后加上ack和窗口信息信息,发送出去.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
void TCPConnection::send_sender_segments (){ //travel the queue to set the ack and windows size. while(!_sender.segments_out().empty()){ TCPSegment seg = _sender.segments_out().front(); _sender.segments_out().pop(); // the ack number is bot null if(_receiver.ackno().has_value()){ seg.header().ack=true; seg.header().ackno=_receiver.ackno().value(); seg.header().win=_receiver.window_size(); } _segments_out.push(seg); } //every time send segment,we need to shutdown. clean_shutdown(); }