module:micropython:esp32udpclientserver:esp32udpclientserver02v01
Table of Contents
Esp32UdpClientServer 02V01
Übersicht
- Esp32(Wlan, Uart): Bidirektionales Senden und -Empfangen von UDP-Datenpaketen ohne Blocking mit Threads
- Programmierung mit VSCode(PyMakr) unter MicroPython mit Uart-MicroPython-Terminal
- Erweiterung von VSCode mit Python-Terminal und Ampy(Adafruits)
Benutzung
Esp32UdpClientServer.py
# # Syntax-Arguments: <headerid> <rxipport> <txipaddress> <txipport> # # WindowsPC[192.168.178.33] <-> # WindowsPC[192.168.178.33]: # WinPC: python CheckUdpClientServer.py Windows 4321 192.168.178.33 4321 # WindowsPC[192.168.178.33] <-> UbuntuPC[192.168.178.48] # WinPC: python CheckUdpClientServer.py Windows 5001 192.168.178.48 5000 # LinPC: python CheckUdpClientServer.py Ubuntu 5000 192.168.178.33 5001 # import sys import time import socket as SKT # import Thread as THR import UdpClientServer as UCS # UDP_HEADERID = 'UDPHID' #----------------------------------------------------------- # UDP - TxD - TransmitData #----------------------------------------------------------- UDP_IPADDRESS_TX = '127.0.0.255' UDP_IPPORT_TX = 5000 # #----------------------------------------------------------- # UDP - RxD - ReceiveData #----------------------------------------------------------- UDP_IPADDRESS_LOCAL = '127.0.0.1' # dummy UDP_IPPORT_RX = 5000 # #----------------------------------------------------------- # Callback - UdpClient #----------------------------------------------------------- def CBOnTxData(udpclientserver, txdata): Line = 'CBOnTxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), txdata) print(Line) return # def CBOnRxData(udpclientserver, rxdata): Line = 'CBOnRxData[{}]<{}>'.format(udpclientserver.GetHeaderID(), rxdata) print(Line) return # def CBOnStart(udpclientserver): # debug print('Main-CBOnStart') return # def CBOnBusy(udpclientserver): # debug print('Main-CBOnBusy') return # def CBOnAbort(udpclientserver): # debug print('Main-CBOnAbort') return # def CBOnEnd(udpclientserver): # debug print('Main-CBOnEnd') return # #----------------------------------------------------------- # Main #----------------------------------------------------------- if ('__main__' == __name__): # print('*** CheckUdpClientServer: begin') if (5 <= len(sys.argv)): # Analyse Arguments für Tx/Rx-Parameter UDP_HEADERID = sys.argv[1] # RX UDP_IPPORT_RX = int(sys.argv[2]) # TX UDP_IPADDRESS_TX = sys.argv[3] UDP_IPPORT_TX = int(sys.argv[4]) # UdpCS = UCS.CUdpClientServer(UDP_HEADERID, UDP_IPPORT_RX, UDP_IPADDRESS_TX, UDP_IPPORT_TX, CBOnTxData, CBOnRxData, CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd) # print('{}: TxIP-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(), UdpCS.GetIPAddressTX(), UdpCS.GetIPPortTX())) print('{}: RxIP(local)-Address[{}]-Port[{}]'.format(UdpCS.GetHeaderID(), UdpCS.GetIPAddressRXLocal(), UdpCS.GetIPPortRX())) UdpCS.Open() for I in range(0, 6): time.sleep(5.0) UdpCS.Transmit('{}[{}]'.format(I,UdpCS.GetHeaderID())) time.sleep(1.0) UdpCS.Close() # print('*** CheckUdpClientServer: end') # #
UdpClientServer.py
# import time import enum as ENU import socket as SKT # import Thread as THR # class CUdpClientServer(): # def __init__(self, headerid, ipportrx, \ ipaddresstx, ipporttx, \ ontxdata, onrxdata, \ onstart, onbusy, onabort, onend): self.HeaderID = headerid self.IPAddressRXLocal = '' self.IPPortRX = ipportrx self.IPAddressTX = ipaddresstx self.IPPortTX = ipporttx self.OnTxData = ontxdata self.OnRxData = onrxdata self.OnStart = onstart self.OnBusy = onbusy self.OnAbort = onabort self.OnEnd = onend self.Thread = THR.CThread(self.CBOnStart, self.CBOnBusy, \ self.CBOnAbort, self.CBOnEnd) self.RxSocket = None self.TxSocket = None # find local IPAddress: S = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) S.connect(('8.8.8.8', 1)) self.IPAddressRXLocal = S.getsockname()[0] S.close() # return # def GetHeaderID(self): return self.HeaderID def GetIPAddressTX(self): return self.IPAddressTX def GetIPPortTX(self): return self.IPPortTX def GetIPAddressRXLocal(self): return self.IPAddressRXLocal def GetIPPortRX(self): return self.IPPortRX # def IsBusy(self): return THR.EStateThread.stBusy == self.Thread.State # def Open(self): # Rx if (THR.EStateThread.stBusy == self.Thread.State): self.Thread.Abort() self.RxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) # FORWARD Socket.bind !!! self.RxSocket.setsockopt(SKT.SOL_SOCKET, SKT.SO_REUSEADDR, 1) self.RxSocket.bind((self.IPAddressRXLocal, self.IPPortRX)) self.RxSocket.setblocking(0) self.Thread.Start() # Tx self.TxSocket = SKT.socket(SKT.AF_INET, SKT.SOCK_DGRAM) self.TxSocket.connect((self.IPAddressTX, self.IPPortTX)) return # def Close(self): self.Thread.Abort() self.RxSocket.close() self.TxSocket.close() # def Abort(self): self.Thread.Abort() return # def Transmit(self, text): if (None != self.OnTxData): self.OnTxData(self, text) self.TxSocket.sendto(text.encode('utf-8'), \ (self.IPAddressTX, self.IPPortTX)) # def CBOnStart(self, thread): if (None != self.OnStart): self.OnStart(self) return # def CBOnBusy(self, thread): while self.IsBusy(): RxData = '' try: Data, Address = self.RxSocket.recvfrom(1024) RxData = Data.decode('utf-8') except SKT.error: pass else: # print('RxData[{}]'.format(Data.decode('utf-8'))) if (None != self.OnRxData): self.OnRxData(self, RxData) finally: time.sleep(0.1) if (None != self.OnBusy): self.OnBusy(self) self.RxSocket.close() return # def CBOnAbort(self, thread): self.RxSocket.close() if (None != self.OnAbort): self.OnAbort(self) return # def CBOnEnd(self, thread): self.RxSocket.close() if (None != self.OnEnd): self.OnEnd(self) return # #
Thread.py
import time import _thread as THR # States stIdle = 0 stBusy = 1 stEnd = 2 # class CThread(): # def __init__(self, onstart, onbusy, onabort, onend): self.State = stIdle self.Thread = None self.OnStart = onstart self.OnBusy = onbusy self.OnAbort = onabort self.OnEnd = onend return # def Start(self): self.State = stBusy if (None != self.OnStart): self.OnStart(self) self.ThreadID = THR.start_new_thread(self.CBOnExecute, [self]) return # def Abort(self): self.State = stEnd if (None != self.OnAbort): self.OnAbort(self) return # def CBOnExecute(self, thread): if (None != self.OnBusy): self.OnBusy(self) if (None != self.OnEnd): self.OnEnd(self) return # # #----------------------------------------------------------- # Check Thread #----------------------------------------------------------- def CBOnStart(thread): print('CBOnStart') return def CBOnBusy(thread): print('CBOnBusy') Counter = 0 while (stBusy == thread.State): Counter += 1 print(Counter) if (3 <= Counter): thread.Abort() else: time.sleep(1.0) return def CBOnAbort(thread): print('CBOnAbort') return def CBOnEnd(thread): print('CBOnEnd') return # if ('__main__' == __name__): # print('*** CheckThread: begin') # Thread = CThread(CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd) Thread.Start() time.sleep(5.0) Thread.Abort() # print('*** CheckThread: end') # #
MicroPython-Terminal-Commands (Comments in Helper.h):
> import os > print(os.listdir())
Ampy-Commands:
> ampy -p COM22 ls
> ampy -p COM22 rm /project.pymakr
> ampy -p COM22 reset
!!! NOCH ERGÄNZEN !!! Ausgabe in Terminal:
Version
module/micropython/esp32udpclientserver/esp32udpclientserver02v01.txt · Last modified: 2022/09/13 11:58 by 127.0.0.1