module:python:udpclientserver:udpclientserver02v03
This is an old revision of the document!
Table of Contents
UdpClientServer 02V03
Benutzung
…py
…py
…py
…py
Hauptprogramm: CheckUdpClientServer.py
Class-Library: UdpClientServer.py
# import time import enum as ENU import socket as SKT # import Thread as THR # class CUdpClientServer(): # def __init__(self, headerid, ipportrx, \ ipaddresstx, ipporttx, \ ontxdata, onrxdata, \ onstart, onbusy, onabort, onend): self.HeaderID = headerid self.IPAddressRXLocal = '' self.IPPortRX = ipportrx self.IPAddressTX = ipaddresstx self.IPPortTX = ipporttx self.OnTxData = ontxdata self.OnRxData = onrxdata self.OnStart = onstart self.OnBusy = onbusy self.OnAbort = onabort self.OnEnd = onend self.Thread = THR.CThread(self.CBOnStart, self.CBOnBusy, \ self.CBOnAbort, self.CBOnEnd) self.RxSocket = None self.TxSocket = None # find local IPAddress: S = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) S.connect(('8.8.8.8', 1)) self.IPAddressRXLocal = S.getsockname()[0] S.close() # return # def GetHeaderID(self): return self.HeaderID def GetIPAddressTX(self): return self.IPAddressTX def GetIPPortTX(self): return self.IPPortTX def GetIPAddressRXLocal(self): return self.IPAddressRXLocal def GetIPPortRX(self): return self.IPPortRX # def IsBusy(self): return THR.EStateThread.stBusy == self.Thread.State # def Open(self): # Rx if (THR.EStateThread.stBusy == self.Thread.State): self.Thread.Abort() self.RxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) # FORWARD Socket.bind !!! self.RxSocket.setsockopt(SKT.SOL_SOCKET, SKT.SO_REUSEADDR, 1) self.RxSocket.bind((self.IPAddressRXLocal, self.IPPortRX)) self.RxSocket.setblocking(0) self.Thread.Start() # Tx self.TxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) self.TxSocket.connect((self.IPAddressTX, self.IPPortTX)) return # def Close(self): self.Thread.Abort() self.RxSocket.close() self.TxSocket.close() # def Abort(self): self.Thread.Abort() return # def Transmit(self, text): if (None != self.OnTxData): self.OnTxData(self, text) self.TxSocket.sendto(text.encode('utf-8'), \ (self.IPAddressTX, self.IPPortTX)) # def CBOnStart(self, thread): if (None != self.OnStart): self.OnStart(self) return # def CBOnBusy(self, thread): while self.IsBusy(): RxData = '' try: Data, Address = self.RxSocket.recvfrom(1024) RxData = Data.decode('utf-8') except SKT.error: pass else: # print('RxData[{}]'.format(Data.decode('utf-8'))) if (None != self.OnRxData): self.OnRxData(self, RxData) finally: time.sleep(0.1) if (None != self.OnBusy): self.OnBusy(self) self.RxSocket.close() return # def CBOnAbort(self, thread): self.RxSocket.close() if (None != self.OnAbort): self.OnAbort(self) return # def CBOnEnd(self, thread): self.RxSocket.close() if (None != self.OnEnd): self.OnEnd(self) return # #
Class-Library: Thread.py
# import time import threading as THD import enum as ENU # class EStateThread(ENU.Enum): stIdle = 0 stBusy = 1 stEnd = 2 # def CBOnExecute(thread): if (None != thread.OnBusy): thread.OnBusy(thread) if (None != thread.OnEnd): thread.OnEnd(thread) return # class CThread(): # def __init__(self, onstart, onbusy, onabort, onend): self.State = EStateThread.stIdle self.Thread = THD.Thread(target=CBOnExecute, args=(self,)) self.OnStart = onstart self.OnBusy = onbusy self.OnAbort = onabort self.OnEnd = onend return # def Start(self): self.State = EStateThread.stBusy if (None != self.OnStart): self.OnStart(self) self.Thread.start() return # def Abort(self): self.State = EStateThread.stEnd if (None != self.OnAbort): self.OnAbort(self) return # #
Version
module/python/udpclientserver/udpclientserver02v03.1639422015.txt.gz · Last modified: 2021/12/13 21:00 (external edit)