User Tools

Site Tools


module:micropython:multiprocessexecution:multiprocessexecution01v00

MultiProcessExecution 01V00

Übersicht

Benutzung

VSCode-MicroPython-Terminal:

>>> 
*** Check Esp32MultiProcessExecution: begin
CBOnStart
CBOnBusy
1
*** Check BlinkSlow: begin
*** Check BlinkSlow: end  
2
*** Check BlinkFast: begin
*** Check BlinkFast: end  
3
*** Check CountUp: begin
*** Check CountUp: end
4
*** Check CountDown: begin
*** Check CountDown: end  
5
CBOnAbort
CBOnEnd
*** Check Esp32MultiProcessExecution: end
♦♦>
MicroPython v1.17 on 2021-09-02; ESP32 module with ESP32
Type "help()" for more information.
>>>

Hauptmodul Esp32MultiProcessExecution.py

#
import time
#
import Thread as THR
#
#---------------------------------------------------------------------    
#   Main - Callback Thread
#---------------------------------------------------------------------    
def CBOnStart(thread):
    print('CBOnStart')
    return
#    
def CBOnBusy(thread):
    print('CBOnBusy')
    Counter = 0
    while (THR.stBusy == thread.State):
        Counter += 1
        print(Counter)
        if (1 == Counter):
            execfile('BlinkSlow.py')
        if (2 == Counter):
            execfile('BlinkFast.py')
        if (3 == Counter):
            execfile('CountUp.py')
        if (4 == Counter):
            execfile('CountDown.py')
        time.sleep(1.0)
    return
#    
def CBOnAbort(thread):
    print('CBOnAbort')
    return
#    
def CBOnEnd(thread):
    print('CBOnEnd')
    return
#
#---------------------------------------------------------------------    
#   Main
#---------------------------------------------------------------------    
if ('__main__' == __name__):
    #
    print('*** Check Esp32MultiProcessExecution: begin')      
    #
    ThreadProcessA = THR.CThread(CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd)
    ThreadProcessA.Start()
    time.sleep(5.0)
    ThreadProcessA.Abort()
    time.sleep(1.0)
    #
    print('*** Check Esp32MultiProcessExecution: end')      
#

ChildProcess BlinkFast.py

#   
#-----------------------------------------------------------
#   Main
#-----------------------------------------------------------
if ('__main__' == __name__):
    #
    print('*** Check BlinkFast: begin')
    #
    #
    print('*** Check BlinkFast: end')
    #
#

ChildProcess BlinkSlow.py

#   
#-----------------------------------------------------------
#   Main
#-----------------------------------------------------------
if ('__main__' == __name__):
    #
    print('*** Check BlinkSlow: begin')
    #
    #
    print('*** Check BlinkSlow: end')
    #
#

ChildProcess CountUp.py

#   
#-----------------------------------------------------------
#   Main
#-----------------------------------------------------------
if ('__main__' == __name__):
    #
    print('*** Check CountUp: begin')
    #
    #
    print('*** Check CountUp: end')
    #
#

ChildProcess CountDown.py

#   
#-----------------------------------------------------------
#   Main
#-----------------------------------------------------------
if ('__main__' == __name__):
    #
    print('*** Check CountDown: begin')
    #
    #
    print('*** Check CountDown: end')
    #
#

Library Thread.py

import time
import _thread
#
# States
stIdle = 0
stBusy = 1
stEnd = 2
#
class CThread():
    #
    def __init__(self, onstart, onbusy, onabort, onend):
        self.State = stIdle        
        self.Thread = None
        self.OnStart = onstart
        self.OnBusy = onbusy
        self.OnAbort = onabort
        self.OnEnd = onend
        return
    #
    def Start(self):
        self.State = stBusy
        if (None != self.OnStart):
            self.OnStart(self)
        self.ThreadID = _thread.start_new_thread(self.CBOnExecute, [self])
        return    
    #
    def Abort(self):
        self.State = stEnd
        if (None != self.OnAbort):
            self.OnAbort(self)
        return
    #
    def CBOnExecute(self, thread):
        if (None != self.OnBusy):
            self.OnBusy(self)
        if (None != self.OnEnd):
            self.OnEnd(self)
        return
    #
 
#
#   
#-----------------------------------------------------------
#   Check Thread
#-----------------------------------------------------------
def CBOnStart(thread):
    print('CBOnStart')
    return
def CBOnBusy(thread):
    print('CBOnBusy')
    Counter = 0
    while (stBusy == thread.State):
        Counter += 1
        print(Counter)
        if (3 <= Counter):
            thread.Abort()
        else:
            time.sleep(1.0)
    return
def CBOnAbort(thread):
    print('CBOnAbort')
    return
def CBOnEnd(thread):
    print('CBOnEnd')
    return
#
if ('__main__' == __name__):
    #
    print('*** Check Thread: begin')        
    #
    Thread = CThread(CBOnStart, CBOnBusy, CBOnAbort, CBOnEnd)
    Thread.Start()
    time.sleep(5.0)
    Thread.Abort()
    #
    print('*** Check Thread: end')
    #
#

Version

module/micropython/multiprocessexecution/multiprocessexecution01v00.txt · Last modified: 2022/09/13 11:58 by 127.0.0.1