python socket?python代码自动生成器
大家好,感谢邀请,今天来为大家分享一下python socket的问题,以及和python代码自动生成器的一些困惑,大家要是还不太明白的话,也没有关系,因为接下来将为大家分享,希望可以帮助到大家,解决大家的问题,下面就开始吧!
python怎么建立socket服务端
socket服务器再细分可分为多种了,tcp,udp,websocket,都是调用socket模块,但是具体实现起来有一点细微的差别
先给出一个tcp和udp通过socket协议实现的聊天室的例子
python聊天室(python2.7版本):
都是分别运行server.py和client.py,就可以进行通讯了。
TCP版本:
socket-tcp-server.py(服务端):
#-*-encoding:utf-8-*-
#socket.getaddrinfo(host,port,family=0,socktype=0,proto=0,flags=0)
#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,
#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.
#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None
#参数family为地主族,可以为AF_INET,AF_INET6,AF_UNIX.
#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)
#参数proto通常为0可以直接忽略
#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值
#附注:给参数host,port传递None时建立在C基础,通过传递NULL。
#该函数返回一个五元组(family,socktype,proto,canonname,sockaddr),同时第五个参数sockaddr也是一个二元组(address,port)
#更多的方法及链接请访问
#Echoserverprogram
fromsocketimport*
importsys
importthreading
fromtimeimportctime
fromtimeimportlocaltime
importtraceback
importtime
importsubprocess
reload(sys)
sys.setdefaultencoding("utf8")
HOST='127.0.0.1'
PORT=8555#设置侦听端口
BUFSIZ=1024
classTcpServer():
def__init__(self):
self.ADDR=(HOST,PORT)
try:
self.sock=socket(AF_INET,SOCK_STREAM)
print'%disopen'%PORT
self.sock.bind(self.ADDR)
self.sock.listen(5)
#设置退出条件
self.STOP_CHAT=False
#所有监听的客户端
self.clients={}
self.thrs={}
self.stops=[]
exceptException,e:
print"%disdown"%PORT
returnFalse
defIsOpen(ip,port):
s=socket(AF_INET,SOCK_STREAM)
try:
s.connect((ip,int(port)))
#s.shutdown(2)
#利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数,
#该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。
print'%disopen'%port
returnTrue
except:
print'%disdown'%port
returnFalse
deflisten_client(self):
whilenotself.STOP_CHAT:
print(u'等待接入,侦听端口:%d'%(PORT))
self.tcpClientSock,self.addr=self.sock.accept()
print(u'接受连接,客户端地址:',self.addr)
address=self.addr
#将建立的clientsocket链接放到列表self.clients中
self.clients[address]=self.tcpClientSock
#分别将每个建立的链接放入进程中,接收且分发消息
self.thrs[address]=threading.Thread(target=self.readmsg,args=[address])
self.thrs[address].start()
time.sleep(0.5)
defreadmsg(self,address):
#如果地址不存在,则返回False
ifaddressnotinself.clients:
returnFalse
#得到发送消息的clientsocket
client=self.clients[address]
whileTrue:
try:
#获取到消息内容data
data=client.recv(BUFSIZ)
except:
print(e)
self.close_client(address)
break
ifnotdata:
break
#python3使用bytes,所以要进行编码
#s='%s发送给我的信息是:[%s]%s'%(addr[0],ctime(),data.decode('utf8'))
#对日期进行一下格式化
ISOTIMEFORMAT='%Y-%m-%d%X'
stime=time.strftime(ISOTIMEFORMAT,localtime())
s=u'%s发送给我的信息是:%s'%(str(address),data.decode('utf8'))
#将获得的消息分发给链接中的clientsocket
forkinself.clients:
self.clients[k].send(s.encode('utf8'))
self.clients[k].sendall('sendall:'+s.encode('utf8'))
printstr(k)
print([stime],':',data.decode('utf8'))
#如果输入quit(忽略大小写),则程序退出
STOP_CHAT=(data.decode('utf8').upper()=="QUIT")
ifSTOP_CHAT:
print"quit"
self.close_client(address)
print"alreadyquit"
break
defclose_client(self,address):
try:
client=self.clients.pop(address)
self.stops.append(address)
client.close()
forkinself.clients:
self.clients[k].send(str(address)+u"已经离开了")
except:
pass
print(str(address)+u'已经退出')
if__name__=='__main__':
tserver=TcpServer()
tserver.listen_client()
——————————华丽的分割线——————————
socket-tcp-client.py(客户端):
#-*-encoding:utf-8-*-
fromsocketimport*
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding("utf8")
#测试,连接本机
HOST='127.0.0.1'
#设置侦听端口
PORT=8555
BUFSIZ=1024
classTcpClient:
ADDR=(HOST,PORT)
def__init__(self):
self.HOST=HOST
self.PORT=PORT
self.BUFSIZ=BUFSIZ
#创建socket连接
self.client=socket(AF_INET,SOCK_STREAM)
self.client.connect(self.ADDR)
#起一个线程,监听接收的信息
self.trecv=threading.Thread(target=self.recvmsg)
self.trecv.start()
defsendmsg(self):
#循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接
whileself.client.connect_ex(self.ADDR):
data=raw_input('>:')
ifnotdata:
break
self.client.send(data.encode('utf8'))
print(u'发送信息到%s:%s'%(self.HOST,data))
ifdata.upper()=="QUIT":
self.client.close()
printu"已关闭"
break
defrecvmsg(self):
#接收消息,如果链接一直存在,则持续监听接收消息
try:
whileself.client.connect_ex(self.ADDR):
data=self.client.recv(self.BUFSIZ)
print(u'从%s收到信息:%s'%(self.HOST,data.decode('utf8')))
exceptException,e:
printstr(e)
if__name__=='__main__':
client=TcpClient()
client.sendmsg()UDP版本:
socket-udp-server.py
#-*-coding:utf8-*-
importsys
importtime
importtraceback
importthreading
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
importtraceback
HOST="127.0.0.1"
PORT=9555
CHECK_PERIOD=20
CHECK_TIMEOUT=15
classUdpServer(object):
def__init__(self):
self.clients=[]
self.beats={}
self.ADDR=(HOST,PORT)
try:
self.sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.sock.bind(self.ADDR)#绑定同一个域名下的所有机器
self.beattrs=threading.Thread(target=self.checkheartbeat)
self.beattrs.start()
exceptException,e:
traceback.print_exc()
returnFalse
deflisten_client(self):
whileTrue:
time.sleep(0.5)
print"hohohohohoo"
try:
recvData,address=self.sock.recvfrom(2048)
ifnotrecvData:
self.close_client(address)
break
ifaddressinself.clients:
senddata=u"%s发送给我的信息是:%s"%(str(address),recvData.decode('utf8'))
ifrecvData.upper()=="QUIT":
self.close_client(address)
ifrecvData=="HEARTBEAT":
self.heartbeat(address)
continue
else:
self.clients.append(address)
senddata=u"%s发送给我的信息是:%s"%(str(address),u'进入了聊天室')
forcinself.clients:
try:
self.sock.sendto(senddata,c)
exceptException,e:
printstr(e)
self.close_client(c)
exceptException,e:
#traceback.print_exc()
printstr(e)
pass
defheartbeat(self,address):
self.beats[address]=time.time()
defcheckheartbeat(self):
whileTrue:
print"checkheartbeat"
printself.beats
try:
forcinself.clients:
printtime.time()
printself.beats[c]
ifself.beats[c]+CHECK_TIMEOUT<time.time():
printu"%s心跳超时,连接已经断开"%str(c)
self.close_client(c)
else:
printu"checkp%s,没有断开"%str(c)
exceptException,e:
traceback.print_exc()
printstr(e)
pass
time.sleep(CHECK_PERIOD)
defclose_client(self,address):
try:
ifaddressinself.clients:
self.clients.remove(address)
ifself.beats.has_key(address):
delself.beats[address]
printself.clients
forcinself.clients:
self.sock.sendto(u'%s已经离开了'%str(address),c)
print(str(address)+u'已经退出')
exceptException,e:
printstr(e)
raise
if__name__=="__main__":
udpServer=UdpServer()
udpServer.listen_client()
——————————华丽的分割线——————————
socket-udp-client.py:
#-*-coding:utf8-*-
importsys
importthreading
importtime
reload(sys)
sys.setdefaultencoding('utf-8')
importsocket
HOST="127.0.0.1"
PORT=9555
#BEAT_PORT=43278
BEAT_PERIOD=5
classUdpClient(object):
def__init__(self):
self.clientsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.HOST=HOST
self.ADDR=(HOST,PORT)
self.clientsock.sendto(u'请求建立链接',self.ADDR)
self.recvtrs=threading.Thread(target=self.recvmsg)
self.recvtrs.start()
self.hearttrs=threading.Thread(target=self.heartbeat)
self.hearttrs.start()
defsendmsg(self):
whileTrue:
data=raw_input(">:")
ifnotdata:
break
self.clientsock.sendto(data.encode('utf-8'),self.ADDR)
ifdata.upper()=='QUIT':
self.clientsock.close()
break
defheartbeat(self):
whileTrue:
self.clientsock.sendto('HEARTBEAT',self.ADDR)
time.sleep(BEAT_PERIOD)
defrecvmsg(self):
whileTrue:
recvData,addr=self.clientsock.recvfrom(1024)
ifnotrecvData:
break
print(u'从%s收到信息:%s'%(self.HOST,recvData.decode('utf8')))
if__name__=="__main__":
udpClient=UdpClient()
udpClient.sendmsg()
python的socket通信为什么会出现积极拒绝
要在WEB上远程管理客户端软件。那我们就仿路由器那种模式用SOCKET来解决吧。
做了个DEMO,本机测试OK,拿到别的机器上做服务器,提示由于目标机器积极拒绝,无法连接。
查询各种资料,有的说是端口没开,有的说是服务没开。
各种雾水啊!仔细一想,问题可能出在本机在局域网IP上,而不是用127.0.0.1。
更正代码后,问题解决。下面演示服务器端代码的关键部分。
protected void Listen()
{
MessageBox.Show("start listening");
stringip="";
System.Net.IPHostEntryIpEntry= System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
for(int i= 0; i!= IpEntry.AddressList.Length; i++)
{if(!IpEntry.AddressList[i].IsIPv6LinkLocal)
{
ip= IpEntry.AddressList[i].ToString();
}
}
IPEndPointipend= new IPEndPoint(IPAddress.Parse(ip), 8000);
Socket sc= new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sc.Bind(ipend);
Socket acc;while(true)
{ sc.Listen(1);
acc= sc.Accept();
byte[] buff= new byte[1024];
intrecbyte= acc.Receive(buff, buff.Length, 0);if(recbyte== 0)break;
stringreciveval="";
reciveval+= Encoding.GetEncoding("gb2312").GetString(buff, 0, recbyte); string returnval="开始升级";byte[] returnBy= Encoding.GetEncoding("gb2312").GetBytes(returnval);
acc.Send(returnBy, returnBy.Length, 0);
}
acc.Close();
sc.Close();
}
public string sendMessage()
{
IPEndPointipend= new IPEndPoint(IPAddress.Parse("192.168.XXX.XXX"),8000);
Socket sc= new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
sc.Connect(ipend); string message="请升级软件";byte[] bt= Encoding.GetEncoding("gb2312").GetBytes(message);
sc.Send(bt,bt.Length,0);
byte[] rebuff= new byte[1024];
intrecive= sc.Receive(rebuff, rebuff.Length, 0);
stringreturnval="";
returnval+= Encoding.GetEncoding("gb2312").GetString(rebuff, 0, recive);
sc.Close();
returnreturnval;
}
用python打造一个基于socket的文件(夹)传输系统
以下是一个基于socket实现文件(夹)传输系统的Python代码示例,包含服务端和客户端实现,并附关键设计说明:
一、核心设计思路双通道分离
控制通道:使用独立socket传输指令(如文件列表、传输状态)
数据通道:负责实际文件数据传输
解决单一socket传输多文件时的数据边界问题
文件夹遍历机制
通过递归遍历生成文件列表
使用回调机制实现顺序传输控制
传输协议设计
指令格式:CMD|PATH|SIZE(如FILE|/test.txt|1024)
数据分块:默认每次发送4096字节
二、服务端代码(ftserver.py)import socketimport osimport threadingimport argparseclass FileServer: def __init__(self, host, port, save_path): self.host= host self.port= port self.save_path= save_path self.control_socket= None self.data_socket= None def start(self):#启动控制通道 self.control_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.control_socket.bind((self.host, self.port)) self.control_socket.listen(1) print(f"Control server listening on{self.host}:{self.port}") while True: conn, addr= self.control_socket.accept() threading.Thread(target=self.handle_control, args=(conn, addr)).start() def handle_control(self, conn, addr): print(f"New control connection from{addr}") while True: data= conn.recv(1024).decode('utf-8') if not data: break cmd, path, size= data.split('|') if cmd=='FILE': self.receive_file(path, int(size), addr) elif cmd=='DIR': os.makedirs(os.path.join(self.save_path, path), exist_ok=True) def receive_file(self, filename, size, client_addr):#创建数据通道 data_port= self.port+ 1 data_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM) data_socket.bind((self.host, data_port)) data_socket.listen(1)#通知客户端数据端口 self.control_socket.sendall(f"DATA_PORT|{data_port}".encode('utf-8'))#接收数据连接 data_conn, _= data_socket.accept() save_path= os.path.join(self.save_path, filename) with open(save_path,'wb') as f: received= 0 while received< size: chunk= data_conn.recv(min(4096, size- received)) if not chunk: break f.write(chunk) received+= len(chunk) print(f"Received{filename}({received}/{size} bytes)") data_conn.close() data_socket.close()if __name__=="__main__": parser= argparse.ArgumentParser() parser.add_argument('-i','--ip', required=True, help='Server IP') parser.add_argument('-p','--port', type=int, default=8888, help='Control port') parser.add_argument('-d','--dir', required=True, help='Save directory') args= parser.parse_args() server= FileServer(args.ip, args.port, args.dir) server.start()三、客户端代码(ftclient.py)import socketimport osimport threadingimport argparseclass FileClient: def __init__(self, host, port, file_path): self.host= host self.port= port self.file_path= file_path self.control_socket= None def send_folder(self):#遍历文件夹生成文件列表 for root, dirs, files in os.walk(self.file_path): for dir in dirs: rel_path= os.path.relpath(os.path.join(root, dir), self.file_path) self.send_command(f"DIR|{rel_path}|0") for file in files: file_path= os.path.join(root, file) rel_path= os.path.relpath(file_path, self.file_path) size= os.path.getsize(file_path) self.send_command(f"FILE|{rel_path}|{size}") self.send_file(file_path, size) def send_command(self, cmd): if not self.control_socket: self.control_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.control_socket.connect((self.host, self.port)) self.control_socket.sendall(cmd.encode('utf-8')) def send_file(self, file_path, size):#等待服务端返回数据端口 data_port= int(self.control_socket.recv(1024).decode('utf-8').split('|')[1])#连接数据通道 data_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM) data_socket.connect((self.host, data_port)) with open(file_path,'rb') as f: sent= 0 while sent< size: chunk= f.read(min(4096, size- sent)) data_socket.sendall(chunk) sent+= len(chunk) print(f"Sent{file_path}({sent}/{size} bytes)") data_socket.close()if __name__=="__main__": parser= argparse.ArgumentParser() parser.add_argument('-i','--ip', required=True, help='Server IP') parser.add_argument('-p','--port', type=int, default=8888, help='Control port') parser.add_argument('-f','--file', required=True, help='File/Folder path') args= parser.parse_args() client= FileClient(args.ip, args.port, args.file) if os.path.isfile(args.file):#如果是单个文件,简化处理 size= os.path.getsize(args.file) client.send_command(f"FILE|{os.path.basename(args.file)}|{size}") client.send_file(args.file, size) else: client.send_folder()四、关键改进说明协议优化
使用|作为分隔符简化指令解析
添加数据端口动态分配机制
错误处理增强
增加socket连接异常处理
添加文件存在性检查
性能优化
调整缓冲区大小为4096字节(可根据网络环境调整)
使用多线程处理并发连接
五、使用说明服务端启动python ftserver.py-i 0.0.0.0-p 8888-d/tmp/receive客户端发送文件#发送单个文件python ftclient.py-i 127.0.0.1-p 8888-f test.txt#发送文件夹python ftclient.py-i 127.0.0.1-p 8888-f/path/to/folder六、注意事项防火墙需放行控制端口(默认8888)和数据端口(默认8889)公网传输需配置端口映射或使用内网穿透工具大文件传输建议增加校验机制(如MD5校验)实际生产环境应考虑添加加密传输功能该实现完整保留了原设计中的双通道分离思想,同时简化了文件夹遍历逻辑,更适合学习理解socket文件传输原理。如需进一步扩展,可考虑添加以下功能:
传输进度显示断点续传多线程并行传输SSL加密传输
关于python socket到此分享完毕,希望能帮助到您。