開源代碼庫,34muduo_net庫源碼分析(十)

 2023-11-10 阅读 24 评论 0

摘要:1.連接關閉時序圖 2.代碼 1.TcpConnection.h // Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file.// Author: Shuo Chen

1.連接關閉時序圖


2.代碼

1.TcpConnection.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
//#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>//#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>namespace muduo
{
namespace net
{class Channel;
class EventLoop;
class Socket;///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,public boost::enable_shared_from_this<TcpConnection>
{public:/// Constructs a TcpConnection with a connected sockfd////// User should not create this object.TcpConnection(EventLoop* loop,const string& name,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr);~TcpConnection();EventLoop* getLoop() const { return loop_; }const string& name() const { return name_; }const InetAddress& localAddress() { return localAddr_; }const InetAddress& peerAddress() { return peerAddr_; }bool connected() const { return state_ == kConnected; }void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }/// Internal use only.void setCloseCallback(const CloseCallback& cb){ closeCallback_ = cb; }// called when TcpServer accepts a new connectionvoid connectEstablished();   // should be called only once// called when TcpServer has removed me from its mapvoid connectDestroyed();  // should be called only onceprivate:enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };void handleRead(Timestamp receiveTime);void handleClose();void handleError();void setState(StateE s) { state_ = s; }EventLoop* loop_;			// 所屬EventLoopstring name_;				// 連接名StateE state_;  // FIXME: use atomic variable// we don't expose those classes to client.boost::scoped_ptr<Socket> socket_;boost::scoped_ptr<Channel> channel_;InetAddress localAddr_;InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;CloseCallback closeCallback_;
};typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;}
}#endif  // MUDUO_NET_TCPCONNECTION_H

2.TcpConnection.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)#include <muduo/net/TcpConnection.h>#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>#include <boost/bind.hpp>#include <errno.h>
#include <stdio.h>using namespace muduo;
using namespace muduo::net;
/*
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{LOG_TRACE << conn->localAddress().toIpPort() << " -> "<< conn->peerAddress().toIpPort() << " is "<< (conn->connected() ? "UP" : "DOWN");
}void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,Buffer* buf,Timestamp)
{buf->retrieveAll();
}
*/
TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr)/*,highWaterMark_(64*1024*1024)*/
{// 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間channel_->setReadCallback(boost::bind(&TcpConnection::handleRead, this, _1));// 連接關閉,回調TcpConnection::handleClosechannel_->setCloseCallback(boost::bind(&TcpConnection::handleClose, this));// 發生錯誤,回調TcpConnection::handleErrorchannel_->setErrorCallback(boost::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this<< " fd=" << sockfd;socket_->setKeepAlive(true);
}TcpConnection::~TcpConnection()
{LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this<< " fd=" << channel_->fd();
}void TcpConnection::connectEstablished()
{loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();channel_->tie(shared_from_this());channel_->enableReading();	// TcpConnection所對應的通道加入到Poller關注connectionCallback_(shared_from_this());LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}void TcpConnection::connectDestroyed()
{loop_->assertInLoopThread();if (state_ == kConnected){setState(kDisconnected);channel_->disableAll();connectionCallback_(shared_from_this());}channel_->remove();
}void TcpConnection::handleRead(Timestamp receiveTime)
{/*loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_SYSERR << "TcpConnection::handleRead";handleError();}*/loop_->assertInLoopThread();int savedErrno = 0;char buf[65536];ssize_t n = ::read(channel_->fd(), buf, sizeof buf);if (n > 0){messageCallback_(shared_from_this(), buf, n);}else if (n == 0){handleClose();}else{errno = savedErrno;LOG_SYSERR << "TcpConnection::handleRead";handleError();}}void TcpConnection::handleClose()
{loop_->assertInLoopThread();LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;assert(state_ == kConnected || state_ == kDisconnecting);// we don't close fd, leave it to dtor, so we can find leaks easily.setState(kDisconnected);channel_->disableAll();TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis);		// 這一行,可以不調用LOG_TRACE << "[7] usecount=" << guardThis.use_count();// must be the last linecloseCallback_(guardThis);	// 調用TcpServer::removeConnectionLOG_TRACE << "[11] usecount=" << guardThis.use_count();
}void TcpConnection::handleError()
{int err = sockets::getSocketError(channel_->fd());LOG_ERROR << "TcpConnection::handleError [" << name_<< "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

開源代碼庫。

3.TcpServer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>namespace muduo
{
namespace net
{class Acceptor;
class EventLoop;///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : boost::noncopyable
{public://typedef boost::function<void(EventLoop*)> ThreadInitCallback;//TcpServer(EventLoop* loop, const InetAddress& listenAddr);TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg);~TcpServer();  // force out-line dtor, for scoped_ptr members.const string& hostport() const { return hostport_; }const string& name() const { return name_; }/// Starts the server if it's not listenning.////// It's harmless to call it multiple times./// Thread safe.void start();/// Set connection callback./// Not thread safe.// 設置連接到來或者連接關閉回調函數void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }/// Set message callback./// Not thread safe.// 設置消息到來回調函數void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }private:/// Not thread safe, but in loopvoid newConnection(int sockfd, const InetAddress& peerAddr);/// Thread safe.void removeConnection(const TcpConnectionPtr& conn);typedef std::map<string, TcpConnectionPtr> ConnectionMap;EventLoop* loop_;  // the acceptor loopconst string hostport_;		// 服務端口const string name_;			// 服務名boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing AcceptorConnectionCallback connectionCallback_;MessageCallback messageCallback_;bool started_;// always in loop threadint nextConnId_;				// 下一個連接IDConnectionMap connections_;	// 連接列表
};}
}#endif  // MUDUO_NET_TCPSERVER_H

4.TcpServer.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.// Author: Shuo Chen (chenshuo at chenshuo dot com)#include <muduo/net/TcpServer.h>#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>#include <boost/bind.hpp>#include <stdio.h>  // snprintfusing namespace muduo;
using namespace muduo::net;TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg): loop_(CHECK_NOTNULL(loop)),hostport_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr)),/*threadPool_(new EventLoopThreadPool(loop)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),*/started_(false),nextConnId_(1)
{// Acceptor::handleRead函數中會回調用TcpServer::newConnection// _1對應的是socket文件描述符,_2對應的是對等方的地址(InetAddress)acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));
}TcpServer::~TcpServer()
{loop_->assertInLoopThread();LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";for (ConnectionMap::iterator it(connections_.begin());it != connections_.end(); ++it){TcpConnectionPtr conn = it->second;it->second.reset();		// 釋放當前所控制的對象,引用計數減一conn->getLoop()->runInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));conn.reset();			// 釋放當前所控制的對象,引用計數減一}
}// 該函數多次調用是無害的
// 該函數可以跨線程調用
void TcpServer::start()
{if (!started_){started_ = true;}if (!acceptor_->listenning()){// get_pointer返回原生指針loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();char buf[32];snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessaryTcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));LOG_TRACE << "[1] usecount=" << conn.use_count();connections_[connName] = conn;LOG_TRACE << "[2] usecount=" << conn.use_count();conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1));conn->connectEstablished();LOG_TRACE << "[5] usecount=" << conn.use_count();}void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();LOG_TRACE << "[8] usecount=" << conn.use_count();size_t n = connections_.erase(conn->name());LOG_TRACE << "[9] usecount=" << conn.use_count();(void)n;assert(n == 1);loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));LOG_TRACE << "[10] usecount=" << conn.use_count();}

5.Reactor_09.cc

jdk源碼剖析手冊。

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>#include <boost/bind.hpp>#include <stdio.h>using namespace muduo;
using namespace muduo::net;class TestServer
{public:TestServer(EventLoop* loop,const InetAddress& listenAddr): loop_(loop),server_(loop, listenAddr, "TestServer"){server_.setConnectionCallback(boost::bind(&TestServer::onConnection, this, _1));server_.setMessageCallback(boost::bind(&TestServer::onMessage, this, _1, _2, _3));}void start(){server_.start();}private:void onConnection(const TcpConnectionPtr& conn){if (conn->connected()){printf("onConnection(): new connection [%s] from %s\n",conn->name().c_str(),conn->peerAddress().toIpPort().c_str());}else{printf("onConnection(): connection [%s] is down\n",conn->name().c_str());}}void onMessage(const TcpConnectionPtr& conn,const char* data,ssize_t len){printf("onMessage(): received %zd bytes from connection [%s]\n",len, conn->name().c_str());}EventLoop* loop_;TcpServer server_;
};int main()
{printf("main(): pid = %d\n", getpid());InetAddress listenAddr(8888);EventLoop loop;TestServer server(&loop, listenAddr);server.start();loop.loop();
}


版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/169634.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息