====== UdpClientServer 02V03 ======
[[http://www.openhardsoftware.de/ | Open Hard- & Software]]
[
[[http://www.openhardsoftware.de/dokuwiki | DokuWiki]]
[[http://www.openhardsoftware.de/websites | WebSites]]
[[http://www.openhardsoftware.de/mediawiki | MediaWiki]]
[[http://www.openhardsoftware.de/nextcloud | NextCloud]]
]
===== Benutzung =====
PC-Python-Software: \\
{{:module:python:UdpClientServer:2112132047_CheckUdpClientServerTerminal.png?600|2112132047_CheckUdpClientServerTerminal.png}}
===== Hauptprogramm: CheckUdpClientServer.py =====
#
# Syntax-Arguments:
#
# WindowsPC[192.168.178.33] <-> # WindowsPC[192.168.178.33]:
# WinPC: python CheckUdpClientServer.py Windows 4321 192.168.178.33 4321
# WindowsPC[192.168.178.33] <-> UbuntuPC[192.168.178.48]
# WinPC: python CheckUdpClientServer.py Windows 5001 192.168.178.48 5000
# LinPC: python CheckUdpClientServer.py Ubuntu 5000 192.168.178.33 5001
#
import sys
import time
import socket as SKT
#
import Thread as THR
import UdpClientServer as UCS
#
UDP_HEADERID = 'UDPHID'
#-----------------------------------------------------------
# UDP - TxD - TransmitData
#-----------------------------------------------------------
UDP_IPADDRESS_TX = '192.168.178.255'
UDP_IPPORT_TX = 5001
#
#-----------------------------------------------------------
# UDP - RxD - ReceiveData
#-----------------------------------------------------------
UDP_IPADDRESS_LOCAL = '192.168.178.33' # dummy
UDP_IPPORT_RX = 5000
#
#-----------------------------------------------------------
# Callback - UdpClient
#-----------------------------------------------------------
def CBOnTxData(udpclientserver, txdata):
Line = 'CBOnTxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), txdata)
print(Line)
return
#
def CBOnRxData(udpclientserver, rxdata):
Line = 'CBOnRxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), rxdata)
print(Line)
return
#
def CBOnStart(udpclientserver):
# debug print('Main-CBOnStart')
return
#
def CBOnBusy(udpclientserver):
# debug print('Main-CBOnBusy')
return
#
def CBOnAbort(udpclientserver):
# debug print('Main-CBOnAbort')
return
#
def CBOnEnd(udpclientserver):
# debug print('Main-CBOnEnd')
return
#
#-----------------------------------------------------------
# Main
#-----------------------------------------------------------
if ('__main__' == __name__):
#
print('*** CheckUdpClientServer: begin')
# if (5 <= len(sys.argv)):
# # Analyse Arguments für Tx/Rx-Parameter
# UDP_HEADERID = sys.argv[1]
# # RX
# UDP_IPPORT_RX = int(sys.argv[2])
# # TX
# UDP_IPADDRESS_TX = sys.argv[3]
# UDP_IPPORT_TX = int(sys.argv[4])
#
UdpCS = UCS.CUdpClientServer(UDP_HEADERID, UDP_IPPORT_RX,
UDP_IPADDRESS_TX, UDP_IPPORT_TX,
CBOnTxData, CBOnRxData,
CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd)
#
print('{}: TxIP-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(),
UdpCS.GetIPAddressTX(),
UdpCS.GetIPPortTX()))
print('{}: RxIP(local)-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(),
UdpCS.GetIPAddressRXLocal(),
UdpCS.GetIPPortRX()))
UdpCS.Open()
while UdpCS.IsBusy():
UdpCS.Transmit('{}[{}]'.format('Hello',UdpCS.GetHeaderID()))
time.sleep(3.0)
UdpCS.Close()
#
print('*** CheckUdpClientServer: end')
#
#
===== Class-Library: UdpClientServer.py =====
#
import time
import enum as ENU
import socket as SKT
#
import Thread as THR
#
class CUdpClientServer():
#
def __init__(self, headerid, ipportrx, \
ipaddresstx, ipporttx, \
ontxdata, onrxdata, \
onstart, onbusy, onabort, onend):
self.HeaderID = headerid
self.IPAddressRXLocal = ''
self.IPPortRX = ipportrx
self.IPAddressTX = ipaddresstx
self.IPPortTX = ipporttx
self.OnTxData = ontxdata
self.OnRxData = onrxdata
self.OnStart = onstart
self.OnBusy = onbusy
self.OnAbort = onabort
self.OnEnd = onend
self.Thread = THR.CThread(self.CBOnStart, self.CBOnBusy, \
self.CBOnAbort, self.CBOnEnd)
self.RxSocket = None
self.TxSocket = None
# find local IPAddress:
S = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
S.connect(('8.8.8.8', 1))
self.IPAddressRXLocal = S.getsockname()[0]
S.close()
#
return
#
def GetHeaderID(self):
return self.HeaderID
def GetIPAddressTX(self):
return self.IPAddressTX
def GetIPPortTX(self):
return self.IPPortTX
def GetIPAddressRXLocal(self):
return self.IPAddressRXLocal
def GetIPPortRX(self):
return self.IPPortRX
#
def IsBusy(self):
return THR.EStateThread.stBusy == self.Thread.State
#
def Open(self):
# Rx
if (THR.EStateThread.stBusy == self.Thread.State):
self.Thread.Abort()
self.RxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
# FORWARD Socket.bind !!!
self.RxSocket.setsockopt(SKT.SOL_SOCKET, SKT.SO_REUSEADDR, 1)
self.RxSocket.bind((self.IPAddressRXLocal, self.IPPortRX))
self.RxSocket.setblocking(0)
self.Thread.Start()
# Tx
self.TxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
self.TxSocket.connect((self.IPAddressTX, self.IPPortTX))
return
#
def Close(self):
self.Thread.Abort()
self.RxSocket.close()
self.TxSocket.close()
#
def Abort(self):
self.Thread.Abort()
return
#
def Transmit(self, text):
if (None != self.OnTxData):
self.OnTxData(self, text)
self.TxSocket.sendto(text.encode('utf-8'), \
(self.IPAddressTX, self.IPPortTX))
#
def CBOnStart(self, thread):
if (None != self.OnStart):
self.OnStart(self)
return
#
def CBOnBusy(self, thread):
while self.IsBusy():
RxData = ''
try:
Data, Address = self.RxSocket.recvfrom(1024)
RxData = Data.decode('utf-8')
except SKT.error:
pass
else:
# print('RxData[{}]'.format(Data.decode('utf-8')))
if (None != self.OnRxData):
self.OnRxData(self, RxData)
finally:
time.sleep(0.1)
if (None != self.OnBusy):
self.OnBusy(self)
self.RxSocket.close()
return
#
def CBOnAbort(self, thread):
self.RxSocket.close()
if (None != self.OnAbort):
self.OnAbort(self)
return
#
def CBOnEnd(self, thread):
self.RxSocket.close()
if (None != self.OnEnd):
self.OnEnd(self)
return
#
#
===== Class-Library: Thread.py =====
#
import time
import threading as THD
import enum as ENU
#
class EStateThread(ENU.Enum):
stIdle = 0
stBusy = 1
stEnd = 2
#
def CBOnExecute(thread):
if (None != thread.OnBusy):
thread.OnBusy(thread)
if (None != thread.OnEnd):
thread.OnEnd(thread)
return
#
class CThread():
#
def __init__(self, onstart, onbusy, onabort, onend):
self.State = EStateThread.stIdle
self.Thread = THD.Thread(target=CBOnExecute, args=(self,))
self.OnStart = onstart
self.OnBusy = onbusy
self.OnAbort = onabort
self.OnEnd = onend
return
#
def Start(self):
self.State = EStateThread.stBusy
if (None != self.OnStart):
self.OnStart(self)
self.Thread.start()
return
#
def Abort(self):
self.State = EStateThread.stEnd
if (None != self.OnAbort):
self.OnAbort(self)
return
#
#
===== Version =====
{{:module:python:UdpClientServer:2112131938_UdpClientServer_02V03.zip|2112131938_UdpClientServer_02V03.zip}}
-----
[[http://www.openhardsoftware.de/ | Open Hard- & Software]]
[
[[http://www.openhardsoftware.de/dokuwiki | DokuWiki]]
[[http://www.openhardsoftware.de/websites | WebSites]]
[[http://www.openhardsoftware.de/mediawiki | MediaWiki]]
[[http://www.openhardsoftware.de/nextcloud | NextCloud]]
]