====== Esp32UdpClientServer 02V01 =====
[[http://www.openhardsoftware.de/ | Open Hard- & Software]]
[
[[http://www.openhardsoftware.de/dokuwiki | DokuWiki]]
[[http://www.openhardsoftware.de/websites | WebSites]]
[[http://www.openhardsoftware.de/mediawiki | MediaWiki]]
[[http://www.openhardsoftware.de/nextcloud | NextCloud]]
]
===== Übersicht =====
* Esp32(Wlan, Uart): Bidirektionales Senden und -Empfangen von UDP-Datenpaketen ohne Blocking mit Threads
* Programmierung mit VSCode(PyMakr) unter MicroPython mit Uart-MicroPython-Terminal
* Erweiterung von VSCode mit Python-Terminal und **Ampy**(Adafruits)
===== Benutzung =====
**Esp32UdpClientServer.py**
#
# Syntax-Arguments:
#
# WindowsPC[192.168.178.33] <-> # WindowsPC[192.168.178.33]:
# WinPC: python CheckUdpClientServer.py Windows 4321 192.168.178.33 4321
# WindowsPC[192.168.178.33] <-> UbuntuPC[192.168.178.48]
# WinPC: python CheckUdpClientServer.py Windows 5001 192.168.178.48 5000
# LinPC: python CheckUdpClientServer.py Ubuntu 5000 192.168.178.33 5001
#
import sys
import time
import socket as SKT
#
import Thread as THR
import UdpClientServer as UCS
#
UDP_HEADERID = 'UDPHID'
#-----------------------------------------------------------
# UDP - TxD - TransmitData
#-----------------------------------------------------------
UDP_IPADDRESS_TX = '127.0.0.255'
UDP_IPPORT_TX = 5000
#
#-----------------------------------------------------------
# UDP - RxD - ReceiveData
#-----------------------------------------------------------
UDP_IPADDRESS_LOCAL = '127.0.0.1' # dummy
UDP_IPPORT_RX = 5000
#
#-----------------------------------------------------------
# Callback - UdpClient
#-----------------------------------------------------------
def CBOnTxData(udpclientserver, txdata):
Line = 'CBOnTxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), txdata)
print(Line)
return
#
def CBOnRxData(udpclientserver, rxdata):
Line = 'CBOnRxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), rxdata)
print(Line)
return
#
def CBOnStart(udpclientserver):
# debug print('Main-CBOnStart')
return
#
def CBOnBusy(udpclientserver):
# debug print('Main-CBOnBusy')
return
#
def CBOnAbort(udpclientserver):
# debug print('Main-CBOnAbort')
return
#
def CBOnEnd(udpclientserver):
# debug print('Main-CBOnEnd')
return
#
#-----------------------------------------------------------
# Main
#-----------------------------------------------------------
if ('__main__' == __name__):
#
print('*** CheckUdpClientServer: begin')
if (5 <= len(sys.argv)):
# Analyse Arguments für Tx/Rx-Parameter
UDP_HEADERID = sys.argv[1]
# RX
UDP_IPPORT_RX = int(sys.argv[2])
# TX
UDP_IPADDRESS_TX = sys.argv[3]
UDP_IPPORT_TX = int(sys.argv[4])
#
UdpCS = UCS.CUdpClientServer(UDP_HEADERID, UDP_IPPORT_RX,
UDP_IPADDRESS_TX, UDP_IPPORT_TX,
CBOnTxData, CBOnRxData,
CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd)
#
print('{}: TxIP-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(),
UdpCS.GetIPAddressTX(),
UdpCS.GetIPPortTX()))
print('{}: RxIP(local)-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(),
UdpCS.GetIPAddressRXLocal(),
UdpCS.GetIPPortRX()))
UdpCS.Open()
for I in range(0, 6):
time.sleep(5.0)
UdpCS.Transmit('{}[{}]'.format(I,UdpCS.GetHeaderID()))
time.sleep(1.0)
UdpCS.Close()
#
print('*** CheckUdpClientServer: end')
#
#
**UdpClientServer.py**
#
import time
import enum as ENU
import socket as SKT
#
import Thread as THR
#
class CUdpClientServer():
#
def __init__(self, headerid, ipportrx, \
ipaddresstx, ipporttx, \
ontxdata, onrxdata, \
onstart, onbusy, onabort, onend):
self.HeaderID = headerid
self.IPAddressRXLocal = ''
self.IPPortRX = ipportrx
self.IPAddressTX = ipaddresstx
self.IPPortTX = ipporttx
self.OnTxData = ontxdata
self.OnRxData = onrxdata
self.OnStart = onstart
self.OnBusy = onbusy
self.OnAbort = onabort
self.OnEnd = onend
self.Thread = THR.CThread(self.CBOnStart, self.CBOnBusy, \
self.CBOnAbort, self.CBOnEnd)
self.RxSocket = None
self.TxSocket = None
# find local IPAddress:
S = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
S.connect(('8.8.8.8', 1))
self.IPAddressRXLocal = S.getsockname()[0]
S.close()
#
return
#
def GetHeaderID(self):
return self.HeaderID
def GetIPAddressTX(self):
return self.IPAddressTX
def GetIPPortTX(self):
return self.IPPortTX
def GetIPAddressRXLocal(self):
return self.IPAddressRXLocal
def GetIPPortRX(self):
return self.IPPortRX
#
def IsBusy(self):
return THR.EStateThread.stBusy == self.Thread.State
#
def Open(self):
# Rx
if (THR.EStateThread.stBusy == self.Thread.State):
self.Thread.Abort()
self.RxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
# FORWARD Socket.bind !!!
self.RxSocket.setsockopt(SKT.SOL_SOCKET, SKT.SO_REUSEADDR, 1)
self.RxSocket.bind((self.IPAddressRXLocal, self.IPPortRX))
self.RxSocket.setblocking(0)
self.Thread.Start()
# Tx
self.TxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM)
self.TxSocket.connect((self.IPAddressTX, self.IPPortTX))
return
#
def Close(self):
self.Thread.Abort()
self.RxSocket.close()
self.TxSocket.close()
#
def Abort(self):
self.Thread.Abort()
return
#
def Transmit(self, text):
if (None != self.OnTxData):
self.OnTxData(self, text)
self.TxSocket.sendto(text.encode('utf-8'), \
(self.IPAddressTX, self.IPPortTX))
#
def CBOnStart(self, thread):
if (None != self.OnStart):
self.OnStart(self)
return
#
def CBOnBusy(self, thread):
while self.IsBusy():
RxData = ''
try:
Data, Address = self.RxSocket.recvfrom(1024)
RxData = Data.decode('utf-8')
except SKT.error:
pass
else:
# print('RxData[{}]'.format(Data.decode('utf-8')))
if (None != self.OnRxData):
self.OnRxData(self, RxData)
finally:
time.sleep(0.1)
if (None != self.OnBusy):
self.OnBusy(self)
self.RxSocket.close()
return
#
def CBOnAbort(self, thread):
self.RxSocket.close()
if (None != self.OnAbort):
self.OnAbort(self)
return
#
def CBOnEnd(self, thread):
self.RxSocket.close()
if (None != self.OnEnd):
self.OnEnd(self)
return
#
#
**Thread.py**
import time
import _thread as THR
# States
stIdle = 0
stBusy = 1
stEnd = 2
#
class CThread():
#
def __init__(self, onstart, onbusy, onabort, onend):
self.State = stIdle
self.Thread = None
self.OnStart = onstart
self.OnBusy = onbusy
self.OnAbort = onabort
self.OnEnd = onend
return
#
def Start(self):
self.State = stBusy
if (None != self.OnStart):
self.OnStart(self)
self.ThreadID = THR.start_new_thread(self.CBOnExecute, [self])
return
#
def Abort(self):
self.State = stEnd
if (None != self.OnAbort):
self.OnAbort(self)
return
#
def CBOnExecute(self, thread):
if (None != self.OnBusy):
self.OnBusy(self)
if (None != self.OnEnd):
self.OnEnd(self)
return
#
#
#-----------------------------------------------------------
# Check Thread
#-----------------------------------------------------------
def CBOnStart(thread):
print('CBOnStart')
return
def CBOnBusy(thread):
print('CBOnBusy')
Counter = 0
while (stBusy == thread.State):
Counter += 1
print(Counter)
if (3 <= Counter):
thread.Abort()
else:
time.sleep(1.0)
return
def CBOnAbort(thread):
print('CBOnAbort')
return
def CBOnEnd(thread):
print('CBOnEnd')
return
#
if ('__main__' == __name__):
#
print('*** CheckThread: begin')
#
Thread = CThread(CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd)
Thread.Start()
time.sleep(5.0)
Thread.Abort()
#
print('*** CheckThread: end')
#
#
MicroPython-Terminal-Commands (Comments in Helper.h):
> import os
> print(os.listdir())
**Ampy**-Commands:
> ampy -p COM22 ls
> ampy -p COM22 rm /project.pymakr
> ampy -p COM22 reset
!!! NOCH ERGÄNZEN !!!
Ausgabe in Terminal:
===== Version =====
{{:module:micropython:2112121950_Esp32UdpClientServer_02V01.zip|2112121950_Esp32UdpClientServer_02V01.zip}}
-----
[[http://www.openhardsoftware.de/ | Open Hard- & Software]]
[
[[http://www.openhardsoftware.de/dokuwiki | DokuWiki]]
[[http://www.openhardsoftware.de/websites | WebSites]]
[[http://www.openhardsoftware.de/mediawiki | MediaWiki]]
[[http://www.openhardsoftware.de/nextcloud | NextCloud]]
]