Skip to content

Automator

CC Anti-Canape: Automation-Building Interface Application

Author: Chris Hinkson
Email: Christopher.Hinkson@clubcar.com

Desc: This application is designed to provide automation for SDO transmission alongisde the rest of the Anti-Canape Package.

Github: https://github.com/chrisInTheCode/CC-Anti-Canape
--> To get access to the github repository, please send me an email at the above address

References:
--> https://python-can.readthedocs.io/en/stable/
--> https://cantools.readthedocs.io/en/latest/

AntiCanapeError

Bases: Exception

For general errors with anything.

Source code in drivers\automator.py
59
60
class AntiCanapeError(Exception):
    '''For general errors with anything.'''

BusOverloadError

Bases: Exception

Exception for when the bus is overloaded!

Source code in drivers\automator.py
56
57
class BusOverloadError(Exception):
    '''Exception for when the bus is overloaded!'''

SdoScriptManager

The script manager class is defined to provide an object-orientated script service for the automation of SDO message transmission.

Examples:

myScriptManager = SdoScriptManager('pcan') myBusInstance = getDefaultBus(self.bus) myScriptManager.script_writeVehicleConfigActiveTest()

Attributes:

Name Type Description
driver

A string representing the driver/dongle to use for CAN bus creation and communication.

Source code in drivers\automator.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
class SdoScriptManager():
    ''' The script manager class is defined to provide an object-orientated script service for the automation of SDO message transmission.

    Examples:
        myScriptManager = SdoScriptManager('pcan')
        myBusInstance = getDefaultBus(self.bus)
        myScriptManager.script_writeVehicleConfigActiveTest()

    Attributes:
        driver: A string representing the driver/dongle to use for CAN bus creation and communication.
    '''
    ########################
    # CLASS INITIALIZATION #
    ########################
    def __init__(self, driver:str) -> None:
        ''' The script manager class is defined to provide an object-orientated script service for the automation of SDO message transmission.

        Args:
            driver (str): The driver (dongle) to use.
        '''     
        print("CC Anti-Canape: A New Scripting Object Has Been Created For The", driver, "Dongle!")

        self.driver = driver
        self.threadTimeout = None



    ##########################
    # CAN BUS INITIALIZATION #
    ##########################
    def getDefaultBus(self, driver:str) -> ThreadSafeBus:
        ''' Gets the default (configured) bus for a driver, based on in-house (Club Car) dongles.
        Reference: [https://python-can.readthedocs.io/en/stable/configuration.html#interface-names]

        Args:
            driver (str): The driver (dongle) to use. Acceptable values are 'kvaser', 'pcan', or 'vector'.

        Returns:
            ThreadSafeBus: The correct bus for the given driver that is safe for several thread dependencies.
        '''        
        try:
            # Default kvaser bus, channel, and byterate
            # [https://python-can.readthedocs.io/en/stable/interfaces/kvaser.html#kvaser-s-canlib]
            if driver.casefold() == 'kvaser'.casefold():
                # Create thread-safe CAN Bus using driver backend
                # [https://python-can.readthedocs.io/en/stable/configuration.html]
                # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
                kvaserBus = ThreadSafeBus(interface='kvaser', channel=0, bitrate=500000)
                kvaserBus.single_handle = True
                return kvaserBus
        # Indicates an error occured while initializing bus
        # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
        except CanInitializationError as error:
            print("CC Anti-Canape: An error has occured when creating the kvaser Bus!") 
            print(type(error).__name__, ": ", error)

        try:
            # Default pcan bus, channel, and byterate
            # [https://python-can.readthedocs.io/en/stable/interfaces/pcan.html]
            # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
            if driver.casefold() == 'pcan'.casefold():
                # Create thread-safe CAN Bus using driver backend
                # [https://python-can.readthedocs.io/en/stable/configuration.html]
                # [https://python-can.readthedocs.io/en/stable/interfaces/pcan.html]
                pcanBus = ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
                return pcanBus
        # Indicates an error occured while initializing bus
        # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
        except CanInitializationError as error:
            print("CC Anti-Canape: An error has occured when creating the PCAN Bus!") 
            print(type(error).__name__, ": ", error)

        try:
            # Default vector bus, channel, and app
            # [https://python-can.readthedocs.io/en/stable/interfaces/vector.html]
            if driver.casefold() == 'vector'.casefold():
                # Create thread-safe CAN Bus using driver backend
                # [https://python-can.readthedocs.io/en/stable/configuration.html]
                # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
                vectorBus = ThreadSafeBus(interface='vector', channel="0, 1", app_name="Anti-Canape")
                return vectorBus
        # Indicates an error occured while initializing bus
        # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
        except CanInitializationError as error:
            print("CC Anti-Canape: An error has occured when creating the vector Bus!") 
            print(type(error).__name__, ": ", error)


    ###############################################
    # SCRIPT 1 - Vehicle Config Active Write Test #
    ###############################################

    def script_writeVehicleConfigActiveTest(self, timeDelayIn:float=0.03, iterations:int=1, timeout:int=None) -> list:
        ''' This script is designed to implement the functionality of Anti-Canape objects in order to test writing the vehicle config active SDO messages. It will continually write the VehicleConfig_Active SDO messages from the DOT to the VCM for the specified amount of iterations. 

        This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

        It will track all correct and error iterations and present them all to the user at the end of script execution.

        Args:
            timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
            iterations (int, optional): The amount of times the test should be run. Defaults to 1.
            timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

        Returns:
            [passes, fails] (list): A list containing the amount of passed tests and amount of failed tests
        '''        
        try:
            # Vehicle Config Result / Error Tracking
            vehicleConfigCorrectList = []
            vehicleConfigErrorCounter = 0
            vehicleConfigErrorList = []
            vehicleConfigErrorParamsList = []

            try:
                # Get the bus for the specified driver
                with self.getDefaultBus(self.driver) as bus:

                    # Get the CAN message reader for the specified bus
                    reader = ACReader(busIn=bus)
                    # Get the CAN message writer for the specified bus
                    writer = ACWriter(busIn=bus)
                    # Get the DBC parser for lookup (CAN)
                    dbcParser = dbcParse()
                    # Get the DOT parser for lookup (SDO)
                    dotParser = dotParse()
                    #Load the DBC dictionaries into the parser
                    dbcParser = dbcParse()
                    # Load the DOT dictionaries into the parser
                    dotParser.loadDictionaries()
                    # Load the DOT parser into the reader
                    reader.loadDotParser(dotParser=dotParser)
                    # Load the DOT parser into the writer
                    writer.loadDotParser(dotParser=dotParser)

                    # Get an automator
                    currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

                    # Repeat the test 100 times
                    for iterCount in range(iterations):
                        # Get the operation error every 50 when bus is reinitialized
                        try:
                            # Iteration Manager
                            print("CC Anti-Canape: Beginning Iteration", iterCount)

                            # vehicleConfig SDO Automation Test
                            # If it takes longer than 60 seconds, quit
                            vehicleConfigSdoAutomator = currentAutomator.writeSdoDict(sdoDictionary={
                                "VehicleConfig_Active_PedalUp": random.choice([0, 1, 2]),
                                "VehicleConfig_Active_DCDCConverter": random.choice([0, 1]),
                                "VehicleConfig_Active_Horn": random.choice([0, 1]),
                                "VehicleConfig_Active_Headlights": random.choice([0, 1]),
                                "VehicleConfig_Active_TurnSignals": random.choice([0, 1]),
                                "VehicleConfig_Active_HazardSwitch": random.choice([0, 1]),
                                "VehicleConfig_Active_BrakeLights": random.choice([0, 1]),
                                "VehicleConfig_Active_WigWag": random.choice([0, 1]),
                                "VehicleConfig_Active_LogoLight": random.choice([0, 1]),
                                "VehicleConfig_Active_DRLEnable": random.choice([0, 1]),
                                "VehicleConfig_Active_VisageControl": random.choice([0, 1]),
                                "VehiceConfig_Active_BodyBuilderRelay": random.choice([0, 1]),
                                "VehicleConfig_Active_PerformanceProfile": random.choice([0, 1, 2]),
                                "VehicleConfig_Active_NumBeepsStartOfCharge": random.choice([0, 1, 2, 3]),
                                "VehicleConfig_Active_OffPeakCharging":random.choice([0, 1, 2]),
                                "VehicleConfig_Active_SOC_Mode": random.choice([0, 1, 10]),
                                "VehicleConfig_Active_DRL_Mode": random.choice([0, 1]),

                                # Timeout's can only be every 20, so scale the random choice
                                "VehicleConfig_Active_Acc_Timeout": (random.choice(list(range(0, 360000))) * 20),
                                "VehicleConfig_Active_Run_Timeout": (random.choice(list(range(0, 135000))) * 20),
                                "VehicleConfig_Active_KeyOffLight_Timeout": (random.choice(list(range(0, 30000))) * 20),

                                # Fails, some of the time
                                "VehicleConfig_Active_BSI": random.choice([0, 1]),
                                "VehicleConfig_Active_isAPPS": random.choice([0, 1]),
                                "VehicleConfig_Active_MaxSpeedRev": random.choice(list(range(0, 76))),

                                # Timeout's can only be every 20, so scale the random choice
                                "VehicleConfig_Active_KeyOffHorn_Timeout": (random.choice(list(range(0, 2160000))) * 20),

                                "VehicleConfig_Active_MotorType": random.choice([0, 1, 2, 3, 4]),
                                "VehicleConfig_Active_WindshieldWiper": random.choice([0, 1]),
                                "VehicleConfig_Active_WindshieldWasher":random.choice([0, 1]),
                                "VehicleConfig_Active_AudioAmplifier": random.choice([0, 1]),
                                "VehicleConfig_Active_Motion_Mode": random.choice([0, 1]),
                                }, 
                                # Pass on the time delay and timeout
                                timeDelay=timeDelayIn, timeout=timeout)

                            # Extra sleep to allow bus to chill
                            sleep(timeDelayIn)
                            # Flush the bus buffer
                            bus.flush_tx_buffer()

                            # Results
                            if vehicleConfigSdoAutomator[1] == 0:
                                vehicleConfigCorrectList.append(iterCount)
                            else:
                                vehicleConfigErrorCounter += vehicleConfigSdoAutomator[1]
                                vehicleConfigErrorParamsList.append(vehicleConfigSdoAutomator[2])
                                vehicleConfigErrorList.append(iterCount)  

                        # Get the operation error every 50 when bus is reinitialized
                        except (CanOperationError, PcanCanOperationError) as exc:
                            print("CC Anti-Canape: An Operation Error Has Been Detected!")
                            print(exc)
                            print("Error occured on line", cf.f_lineno, "of file", pythonfilename)



            except KeyboardInterrupt:
                print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

            # Overall Result Display
            print("Test Completed!")
            print("VehicleConfig Error Counter: ", vehicleConfigErrorCounter)
            print("")

            # Error Result Display
            if vehicleConfigErrorCounter != 0:
                print("VehicleConfig Error List: ", vehicleConfigErrorList)
                print("")
                print("VehicleConfig Error Params: ", vehicleConfigErrorParamsList)
                print("")

            # Correct Result Display
            print("VehicleConfig Correct:", vehicleConfigCorrectList)

        except KeyboardInterrupt:
            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

        # Return amount of passed tests and failed tests
        return [len(vehicleConfigCorrectList), len(vehicleConfigErrorList)]


    #############################
    # SCRIPT 2 - DST Write Test #
    #############################
    def script_writeDstTest(self, timeDelayIn:float=0.03, iterations:int=1) -> bool:
        ''' This script is designed to implement the functionality of Anti-Canape objects in order to test writing the DST SDO messages. It will continually write the DST SDO messages from the DOT to the VCM for the specified amount of iterations. 

        This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

        It will track all correct and error iterations and present them all to the user at the end of script execution.

        Args:
            timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
            iterations (int, optional): The amount of times the test should be run. Defaults to 1.
            timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

        Returns:
            bool: True if the script completed correctly, False otherwise.
        '''        
        try:
            # DST Result / Error Tracking
            dstErrorCounter = 0
            dstErrorList = []
            dstErrorParamsList = []
            dstCorrectList = []

            try:
                # Get the bus for the specified driver
                with self.getDefaultBus(self.driver) as bus:

                    # Get the CAN message reader for the specified bus
                    reader = ACReader(busIn=bus)
                    # Get the CAN message writer for the specified bus
                    writer = ACWriter(busIn=bus)
                    dbcParser = dbcParse()
                    # Get the DOT parser for lookup (SDO)
                    dotParser = dotParse()
                    #Load the DBC dictionaries into the parser
                    dbcParser = dbcParse()
                    # Load the DOT dictionaries into the parser
                    dotParser.loadDictionaries()
                    # Load the DOT parser into the reader
                    reader.loadDotParser(dotParser=dotParser)
                    # Load the DOT parser into the writer
                    writer.loadDotParser(dotParser=dotParser)

                    # Get an automator
                    currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

                    # Repeat the test 100 times
                    for iterCount in range(iterations):
                        # Get the operation error every 50 when bus is reinitialized
                        try:
                            # DST SDO Automation Test
                            dstSdoAutomator = currentAutomator.writeSdoDict(sdoDictionary={
                                "UTC_TimeZoneOffset_Minutes": random.choice(list(range(0, 840))),
                                "DST_enable": random.choice([0, 1]),
                                "DST_rule_UTC_start": random.choice([0, 1, 2]),
                                "DST_rule_dateType_start": random.choice([1, 2, 3]),
                                "DST_rule_date_start": random.choice(list(range(1, 32))), #[1, 31] == [1, 32)
                                "DST_rule_month_start": random.choice(list(range(1, 13))), #[1, 12] == [1, 13)
                                "DST_rule_week_start": random.choice(list(range(1, 5))), #[1, 4] == [1, 5)
                                "DST_rule_weekday_start": random.choice(list(range(0, 7))), #[0, 6] == [0, 7)
                                "DST_rule_time_start": random.choice(list(range(0, 1441))), #[0, 1440] == [0, 1441)
                                "DST_rule_UTC_end": random.choice([0, 1, 2]),
                                "DST_rule_dateType_end": random.choice([1, 2, 3]),
                                "DST_rule_date_end": random.choice(list(range(1, 32))), #[1, 31] == [1, 32)
                                "DST_rule_month_end": random.choice(list(range(1, 13))), #[1, 12] == [1, 13)
                                "DST_rule_week_end": random.choice(list(range(1, 5))), #[1, 4] == [1, 5)
                                "DST_rule_weekday_end": random.choice(list(range(0, 7))), #[0, 6] == [0, 7)
                                "DST_rule_time_end": random.choice(list(range(0, 1441))), #[0, 1440] == [0, 1441)
                            },
                            # Pass on the time delay
                            timeDelay=timeDelayIn)

                            # Results
                            if dstSdoAutomator[1] == 0:
                                dstCorrectList.append(iterCount)
                            else:
                                dstErrorCounter += dstSdoAutomator[1]
                                dstErrorParamsList.append(dstSdoAutomator[2])
                                dstErrorList.append(iterCount)


                            # Iteration Manager
                            print("CC Anti-Canape: Beginning Iteration", iterCount)  

                        except KeyboardInterrupt:
                            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

            except KeyboardInterrupt:
                print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
            except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
                if DEBUG_MODE:
                    logging.debug("CC Anti-Canape: An Error was detected!")
                    logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                    logging.error(exc)
                    raise

            # Overall Result Display
            print("Test Completed!")
            print("DST Error Counter: ", dstErrorCounter)
            print("")

            # Error Result Display
            if dstErrorCounter != 0:
                print("DST Error List: ", dstErrorList)
                print("")
                print("DST Error Params: ", dstErrorParamsList)
                print("")

            # Correct Result Display
            print("DST Correct: ", dstCorrectList)

        except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
            if DEBUG_MODE:
                logging.debug("CC Anti-Canape: An Error was detected!")
                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                logging.error(exc)
                raise

        #Return true for total script completion
        return True


    #######################################
    # SCRIPT 3 - Initialize Configuration #
    #######################################
    def script_configureSdo(self, timeDelay:float=0.03):
        ''' This script is designed to automatically write VCM configurations using SDO parameters. It will take a list of parameters and values as input, complete an initial read of the SDO parameters on the VCM, write the new SDO parameters to the VCM, complete a save NVM command on the VCM, instruct the user to power cycle the VCM, perform a final read of SDO parameters on the VCM, and provide the user with a final table displaying the parameters and all values along with a color-coded highlighting to identify correctness/failures.

        Args:
            timeDelay (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

        Returns:
            _type_: True if the script completed correctly, False otherwise.
        '''       
        # Script Header
        print("======== CC Anti-Canape: SDO Auto Configuration =========")
        print("Welcome to the SDO Auto Configuration for SDO Parameters!")
        print("")

        # Get Driver
        print("To start, select a dongle ('vector', 'pcan', or 'kvaser')!")
        driverInput = input("Driver: ")
        print("")
        while (driverInput.casefold() != 'vector'.casefold()) and (driverInput.casefold() != 'pcan'.casefold()) and (driverInput.casefold() != 'kvaser'.casefold()):
            print("Error: Incorrect Input. Enter 'vector', 'pcan', or 'kvaser'!")
            driverInput = input("Driver: ")
        print(driverInput, " Driver Has Been Selected!")
        self.driver = driverInput
        print("")

        # Get configuration file
        print("Next, please specify the location (path) of the config file!")
        pathInput = input("File Path: ")
        while (path.splitext(pathInput)[-1] != '.txt'): 
            print("Error: Incorrect Input. Give path of a '.txt' config file!")
            pathInput = input("File Path: ")
        print("")
        print(pathInput, " Has Been Selected For Configuration!")
        print("")

        # Get SDO parameters to configure from file
        sdoConfigDict = {'VehicleConfig_Active_Headlights': 1}
        # NOTE: Temporary for testing

        # with open(path) as configFile:
        #     # TODO: Figure out parsing of config file depending upon the formatting
        #     # TODO: Add each param:value to dict
        #     pass

        # Get the bus for the driver
        bus = self.getDefaultBus(self.driver)
        # Get the CAN message reader for the specified bus
        reader = ACReader(busIn=bus)
        # Get the CAN message writer for the specified bus
        writer = ACWriter(busIn=bus)
        dbcParser = dbcParse()
        # Get the DOT parser for lookup (SDO)
        dotParser = dotParse()
        #Load the DBC dictionaries into the parser
        dbcParser = dbcParse()
        # Load the DOT dictionaries into the parser
        dotParser.loadDictionaries()
        # Load the DOT parser into the reader
        reader.loadDotParser(dotParser=dotParser)
        # Load the DOT parser into the writer
        writer.loadDotParser(dotParser=dotParser)
        # Get an automator
        currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

        # Create the printing data frame
        printDataFrame = pd.DataFrame()

        # Write configurations and save the return information
        print("Writing SDO Configurations to VCM!")
        sdoWriteReturn = currentAutomator.writeSdoDict(sdoConfigDict, timeDelay=0.02, doPrint=False)
        # If there were errors
        while sdoWriteReturn[1] != 0:
            print("Error: Writing Did Not Complete Correctly. Trying Again!")
            # Try to write slightly slower
            sdoWriteReturn = currentAutomator.writeSdoDict(sdoConfigDict, timeDelay=0.03, doPrint=False)
        # If there were no errors, then announce success
        print("SDO Configuration Writing Successful!")
        print("")

        # Save writing stuff to the printing dataframe
        writtenDf = sdoWriteReturn[0]
        printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL] = writtenDf.loc[:, 'Parameter Name']
        printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Initial Value Before Write ' + Style.RESET_ALL] = writtenDf.loc[:, 'Initial Read Value']
        printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Configured Value After Write ' + Style.RESET_ALL] = writtenDf.loc[:, 'Written Value']

        # Save NVM Command
        print("Executing Save NVM Command!")
        writer.singleSdoWrite(mainId="VCM_SaveNVM", payloadIn=1)
        saveNvmResponse = reader.getSdoResponse(doPrint=False)
        while (saveNvmResponse[2] != 'writesuccess'):
            print("Error: Save NVM Did Not Complete Correctly. Trying Again!")
            writer.singleSdoWrite(mainId="VCM_SaveNVM", payloadIn=1)
            saveNvmResponse = reader.getSdoResponse(doPrint=False)
        print("Save NVM Successful!")
        print("")

        # Instruct User to Reset VCM Power
        print("Please Reset VCM Power (Unplug/Replug)!")
        print("When Done, Hit Any Key To Continue!")
        system('pause')
        print("Power Reset Completed!")
        print("")

        # Read Final SDO values
        print("Reading Final SDO Values!")
        sdoReadReturn = currentAutomator.readSdoList(sdoList=sdoConfigDict.keys(), doPrint=False)
        print("Final SDO Values Read!")
        print("")
        printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Final Value After Power Reset ' + Style.RESET_ALL] = sdoReadReturn.loc[:,'Read Value']

        # For every row, check if it was correctly saved
        for paramCounter in range(len(printDataFrame.index)):
            # If correct, then color green
            if printDataFrame.iloc[paramCounter, 2] == printDataFrame.iloc[paramCounter, 3]:
                printDataFrame.iloc[paramCounter, 0] = Fore.WHITE + Back.GREEN + Style.DIM + printDataFrame.iloc[paramCounter, 0]
                printDataFrame.iloc[paramCounter, 3] = str(printDataFrame.iloc[paramCounter, 3]) + Style.RESET_ALL
            # If wrong, then color red
            else:
                printDataFrame.iloc[paramCounter, 0] = Fore.WHITE + Back.RED + Style.DIM + printDataFrame.iloc[paramCounter, 0]
                printDataFrame.iloc[paramCounter, 3] = str(printDataFrame.iloc[paramCounter, 3]) + Style.RESET_ALL



        # Print Final Data Frame
        print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
        print(Style.RESET_ALL)

        # Script Footer
        print("============= Thank You For Using CCACSAC! ==============")
        print("== Just Remember: It's Always a Great Day At The Club! ==")
        print("================= Author: Chris Hinkson =================")
        print("======= Contact: Christopher.Hinkson@clubcar.com ========")
        print("======== CC Anti-Canape: SDO Auto Configuration =========")

        # Return true for success
        return True


    ###################################
    # SCRIPT 4 - Fault SDO Read Check #
    ###################################
    def script_readFaultsTest(self, timeDelayIn:float=0.03, iterations:int=1, timeout:int=None) -> bool:
        ''' This script is designed to implement the functionality of Anti-Canape objects in order to test the reading of SDO messages. It will attempt to read all faults SDO messages currently in the loaded DOT.

        Args:
            timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
            iterations (int, optional): The amount of times the test should be run. Defaults to 1.
            timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.


        Raises:
            AntiCanapeError: Custom Exception that allows for general exception handling and debugging information.

        '''        
        try:
            # Get the bus for the specified driver
            with self.getDefaultBus(self.driver) as bus:

                # Get the CAN message reader for the specified bus
                reader = ACReader(busIn=bus)
                # Get the CAN message writer for the specified bus
                writer = ACWriter(busIn=bus)
                # Get the DBC parser for lookup (CAN)
                dbcParser = dbcParse()
                # Get the DOT parser for lookup (SDO)
                dotParser = dotParse()
                #Load the DBC dictionaries into the parser
                dbcParser = dbcParse()
                # Load the DOT dictionaries into the parser
                dotParser.loadDictionaries()
                # Load the DOT parser into the reader
                reader.loadDotParser(dotParser=dotParser)
                # Load the DOT parser into the writer
                writer.loadDotParser(dotParser=dotParser)

                # Get an automator
                currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

                # Repeat the test iterations times
                for iterCount in range(iterations):

                    # Get the operation error when bus is reinitialized
                    try:
                        # Iteration Manager
                        if iterations != 1:
                            print("CC Anti-Canape: Beginning Iteration", iterCount)

                        # Faults list creation
                        faultsList = [20480, 20481] + list(range(20483, 20499))

                        # Check if bus is overloaded
                        try:
                            # Fault Read Test
                            currentAutomator.readSdoList(sdoList=faultsList, timeDelay=timeDelayIn, timeout=timeout)
                        except (KeyboardInterrupt) as exc: #For when user interrupts program
                            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                                raise AntiCanapeError(exc)
                            raise # If debug mode off, still raise the error as is
                        except (BusOverloadError) as exc: # For when the bus is overloaded / in error state
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                            raise AntiCanapeError(exc)
                        except (PcanError, PcanCanOperationError, PcanCanInitializationError) as exc: # For when pcan has issues
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: A PCAN Error was detected!")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                            raise AntiCanapeError(exc)
                        except (CanError, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, CanTimeoutError) as exc: # Any CAN error
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: A CAN Error was detected!")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                            raise AntiCanapeError(exc)
                        except (AttributeError) as exc: # Silly attribute errors
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: There has been an error when scanning for the ID!")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                            raise AntiCanapeError(exc)
                        except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
                            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: An Error was detected!")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                                raise

                        # Extra sleep to allow bus to chill
                        sleep(timeDelayIn)
                        # Flush the bus buffer
                        bus.flush_tx_buffer()

                    except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: An Error was detected!")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                            raise

        except (KeyboardInterrupt) as exc:
            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
            if DEBUG_MODE:
                                logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                                logging.error(exc)
                                raise AntiCanapeError(exc)
            raise
        except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
            if DEBUG_MODE:
                logging.debug("CC Anti-Canape: An Error was detected!")
                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                logging.error(exc)
                raise
        except Exception as exc:
            if DEBUG_MODE:
                logging.debug("CC Anti-Canape: An Error was detected!")
                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                logging.error(exc)
                raise

__init__(driver)

The script manager class is defined to provide an object-orientated script service for the automation of SDO message transmission.

Parameters:

Name Type Description Default
driver str

The driver (dongle) to use.

required
Source code in drivers\automator.py
338
339
340
341
342
343
344
345
346
347
def __init__(self, driver:str) -> None:
    ''' The script manager class is defined to provide an object-orientated script service for the automation of SDO message transmission.

    Args:
        driver (str): The driver (dongle) to use.
    '''     
    print("CC Anti-Canape: A New Scripting Object Has Been Created For The", driver, "Dongle!")

    self.driver = driver
    self.threadTimeout = None

getDefaultBus(driver)

Gets the default (configured) bus for a driver, based on in-house (Club Car) dongles. Reference: [https://python-can.readthedocs.io/en/stable/configuration.html#interface-names]

Parameters:

Name Type Description Default
driver str

The driver (dongle) to use. Acceptable values are 'kvaser', 'pcan', or 'vector'.

required

Returns:

Name Type Description
ThreadSafeBus ThreadSafeBus

The correct bus for the given driver that is safe for several thread dependencies.

Source code in drivers\automator.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def getDefaultBus(self, driver:str) -> ThreadSafeBus:
    ''' Gets the default (configured) bus for a driver, based on in-house (Club Car) dongles.
    Reference: [https://python-can.readthedocs.io/en/stable/configuration.html#interface-names]

    Args:
        driver (str): The driver (dongle) to use. Acceptable values are 'kvaser', 'pcan', or 'vector'.

    Returns:
        ThreadSafeBus: The correct bus for the given driver that is safe for several thread dependencies.
    '''        
    try:
        # Default kvaser bus, channel, and byterate
        # [https://python-can.readthedocs.io/en/stable/interfaces/kvaser.html#kvaser-s-canlib]
        if driver.casefold() == 'kvaser'.casefold():
            # Create thread-safe CAN Bus using driver backend
            # [https://python-can.readthedocs.io/en/stable/configuration.html]
            # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
            kvaserBus = ThreadSafeBus(interface='kvaser', channel=0, bitrate=500000)
            kvaserBus.single_handle = True
            return kvaserBus
    # Indicates an error occured while initializing bus
    # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
    except CanInitializationError as error:
        print("CC Anti-Canape: An error has occured when creating the kvaser Bus!") 
        print(type(error).__name__, ": ", error)

    try:
        # Default pcan bus, channel, and byterate
        # [https://python-can.readthedocs.io/en/stable/interfaces/pcan.html]
        # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
        if driver.casefold() == 'pcan'.casefold():
            # Create thread-safe CAN Bus using driver backend
            # [https://python-can.readthedocs.io/en/stable/configuration.html]
            # [https://python-can.readthedocs.io/en/stable/interfaces/pcan.html]
            pcanBus = ThreadSafeBus(interface='pcan', channel='PCAN_USBBUS1', bitrate=500000)
            return pcanBus
    # Indicates an error occured while initializing bus
    # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
    except CanInitializationError as error:
        print("CC Anti-Canape: An error has occured when creating the PCAN Bus!") 
        print(type(error).__name__, ": ", error)

    try:
        # Default vector bus, channel, and app
        # [https://python-can.readthedocs.io/en/stable/interfaces/vector.html]
        if driver.casefold() == 'vector'.casefold():
            # Create thread-safe CAN Bus using driver backend
            # [https://python-can.readthedocs.io/en/stable/configuration.html]
            # [https://python-can.readthedocs.io/en/stable/bus.html#can.ThreadSafeBus]
            vectorBus = ThreadSafeBus(interface='vector', channel="0, 1", app_name="Anti-Canape")
            return vectorBus
    # Indicates an error occured while initializing bus
    # [https://python-can.readthedocs.io/en/stable/errors.html#can.exceptions.CanInitializationError]
    except CanInitializationError as error:
        print("CC Anti-Canape: An error has occured when creating the vector Bus!") 
        print(type(error).__name__, ": ", error)

script_configureSdo(timeDelay=0.03)

This script is designed to automatically write VCM configurations using SDO parameters. It will take a list of parameters and values as input, complete an initial read of the SDO parameters on the VCM, write the new SDO parameters to the VCM, complete a save NVM command on the VCM, instruct the user to power cycle the VCM, perform a final read of SDO parameters on the VCM, and provide the user with a final table displaying the parameters and all values along with a color-coded highlighting to identify correctness/failures.

Parameters:

Name Type Description Default
timeDelay float

The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

0.03

Returns:

Name Type Description
_type_

True if the script completed correctly, False otherwise.

Source code in drivers\automator.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
def script_configureSdo(self, timeDelay:float=0.03):
    ''' This script is designed to automatically write VCM configurations using SDO parameters. It will take a list of parameters and values as input, complete an initial read of the SDO parameters on the VCM, write the new SDO parameters to the VCM, complete a save NVM command on the VCM, instruct the user to power cycle the VCM, perform a final read of SDO parameters on the VCM, and provide the user with a final table displaying the parameters and all values along with a color-coded highlighting to identify correctness/failures.

    Args:
        timeDelay (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

    Returns:
        _type_: True if the script completed correctly, False otherwise.
    '''       
    # Script Header
    print("======== CC Anti-Canape: SDO Auto Configuration =========")
    print("Welcome to the SDO Auto Configuration for SDO Parameters!")
    print("")

    # Get Driver
    print("To start, select a dongle ('vector', 'pcan', or 'kvaser')!")
    driverInput = input("Driver: ")
    print("")
    while (driverInput.casefold() != 'vector'.casefold()) and (driverInput.casefold() != 'pcan'.casefold()) and (driverInput.casefold() != 'kvaser'.casefold()):
        print("Error: Incorrect Input. Enter 'vector', 'pcan', or 'kvaser'!")
        driverInput = input("Driver: ")
    print(driverInput, " Driver Has Been Selected!")
    self.driver = driverInput
    print("")

    # Get configuration file
    print("Next, please specify the location (path) of the config file!")
    pathInput = input("File Path: ")
    while (path.splitext(pathInput)[-1] != '.txt'): 
        print("Error: Incorrect Input. Give path of a '.txt' config file!")
        pathInput = input("File Path: ")
    print("")
    print(pathInput, " Has Been Selected For Configuration!")
    print("")

    # Get SDO parameters to configure from file
    sdoConfigDict = {'VehicleConfig_Active_Headlights': 1}
    # NOTE: Temporary for testing

    # with open(path) as configFile:
    #     # TODO: Figure out parsing of config file depending upon the formatting
    #     # TODO: Add each param:value to dict
    #     pass

    # Get the bus for the driver
    bus = self.getDefaultBus(self.driver)
    # Get the CAN message reader for the specified bus
    reader = ACReader(busIn=bus)
    # Get the CAN message writer for the specified bus
    writer = ACWriter(busIn=bus)
    dbcParser = dbcParse()
    # Get the DOT parser for lookup (SDO)
    dotParser = dotParse()
    #Load the DBC dictionaries into the parser
    dbcParser = dbcParse()
    # Load the DOT dictionaries into the parser
    dotParser.loadDictionaries()
    # Load the DOT parser into the reader
    reader.loadDotParser(dotParser=dotParser)
    # Load the DOT parser into the writer
    writer.loadDotParser(dotParser=dotParser)
    # Get an automator
    currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

    # Create the printing data frame
    printDataFrame = pd.DataFrame()

    # Write configurations and save the return information
    print("Writing SDO Configurations to VCM!")
    sdoWriteReturn = currentAutomator.writeSdoDict(sdoConfigDict, timeDelay=0.02, doPrint=False)
    # If there were errors
    while sdoWriteReturn[1] != 0:
        print("Error: Writing Did Not Complete Correctly. Trying Again!")
        # Try to write slightly slower
        sdoWriteReturn = currentAutomator.writeSdoDict(sdoConfigDict, timeDelay=0.03, doPrint=False)
    # If there were no errors, then announce success
    print("SDO Configuration Writing Successful!")
    print("")

    # Save writing stuff to the printing dataframe
    writtenDf = sdoWriteReturn[0]
    printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL] = writtenDf.loc[:, 'Parameter Name']
    printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Initial Value Before Write ' + Style.RESET_ALL] = writtenDf.loc[:, 'Initial Read Value']
    printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Configured Value After Write ' + Style.RESET_ALL] = writtenDf.loc[:, 'Written Value']

    # Save NVM Command
    print("Executing Save NVM Command!")
    writer.singleSdoWrite(mainId="VCM_SaveNVM", payloadIn=1)
    saveNvmResponse = reader.getSdoResponse(doPrint=False)
    while (saveNvmResponse[2] != 'writesuccess'):
        print("Error: Save NVM Did Not Complete Correctly. Trying Again!")
        writer.singleSdoWrite(mainId="VCM_SaveNVM", payloadIn=1)
        saveNvmResponse = reader.getSdoResponse(doPrint=False)
    print("Save NVM Successful!")
    print("")

    # Instruct User to Reset VCM Power
    print("Please Reset VCM Power (Unplug/Replug)!")
    print("When Done, Hit Any Key To Continue!")
    system('pause')
    print("Power Reset Completed!")
    print("")

    # Read Final SDO values
    print("Reading Final SDO Values!")
    sdoReadReturn = currentAutomator.readSdoList(sdoList=sdoConfigDict.keys(), doPrint=False)
    print("Final SDO Values Read!")
    print("")
    printDataFrame[Fore.WHITE + Back.BLUE + Style.DIM + ' Final Value After Power Reset ' + Style.RESET_ALL] = sdoReadReturn.loc[:,'Read Value']

    # For every row, check if it was correctly saved
    for paramCounter in range(len(printDataFrame.index)):
        # If correct, then color green
        if printDataFrame.iloc[paramCounter, 2] == printDataFrame.iloc[paramCounter, 3]:
            printDataFrame.iloc[paramCounter, 0] = Fore.WHITE + Back.GREEN + Style.DIM + printDataFrame.iloc[paramCounter, 0]
            printDataFrame.iloc[paramCounter, 3] = str(printDataFrame.iloc[paramCounter, 3]) + Style.RESET_ALL
        # If wrong, then color red
        else:
            printDataFrame.iloc[paramCounter, 0] = Fore.WHITE + Back.RED + Style.DIM + printDataFrame.iloc[paramCounter, 0]
            printDataFrame.iloc[paramCounter, 3] = str(printDataFrame.iloc[paramCounter, 3]) + Style.RESET_ALL



    # Print Final Data Frame
    print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
    print(Style.RESET_ALL)

    # Script Footer
    print("============= Thank You For Using CCACSAC! ==============")
    print("== Just Remember: It's Always a Great Day At The Club! ==")
    print("================= Author: Chris Hinkson =================")
    print("======= Contact: Christopher.Hinkson@clubcar.com ========")
    print("======== CC Anti-Canape: SDO Auto Configuration =========")

    # Return true for success
    return True

script_readFaultsTest(timeDelayIn=0.03, iterations=1, timeout=None)

This script is designed to implement the functionality of Anti-Canape objects in order to test the reading of SDO messages. It will attempt to read all faults SDO messages currently in the loaded DOT.

Parameters:

Name Type Description Default
timeDelayIn float

The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

0.03
iterations int

The amount of times the test should be run. Defaults to 1.

1
timeout int

The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

None

Raises:

Type Description
AntiCanapeError

Custom Exception that allows for general exception handling and debugging information.

Source code in drivers\automator.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
def script_readFaultsTest(self, timeDelayIn:float=0.03, iterations:int=1, timeout:int=None) -> bool:
    ''' This script is designed to implement the functionality of Anti-Canape objects in order to test the reading of SDO messages. It will attempt to read all faults SDO messages currently in the loaded DOT.

    Args:
        timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
        iterations (int, optional): The amount of times the test should be run. Defaults to 1.
        timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.


    Raises:
        AntiCanapeError: Custom Exception that allows for general exception handling and debugging information.

    '''        
    try:
        # Get the bus for the specified driver
        with self.getDefaultBus(self.driver) as bus:

            # Get the CAN message reader for the specified bus
            reader = ACReader(busIn=bus)
            # Get the CAN message writer for the specified bus
            writer = ACWriter(busIn=bus)
            # Get the DBC parser for lookup (CAN)
            dbcParser = dbcParse()
            # Get the DOT parser for lookup (SDO)
            dotParser = dotParse()
            #Load the DBC dictionaries into the parser
            dbcParser = dbcParse()
            # Load the DOT dictionaries into the parser
            dotParser.loadDictionaries()
            # Load the DOT parser into the reader
            reader.loadDotParser(dotParser=dotParser)
            # Load the DOT parser into the writer
            writer.loadDotParser(dotParser=dotParser)

            # Get an automator
            currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

            # Repeat the test iterations times
            for iterCount in range(iterations):

                # Get the operation error when bus is reinitialized
                try:
                    # Iteration Manager
                    if iterations != 1:
                        print("CC Anti-Canape: Beginning Iteration", iterCount)

                    # Faults list creation
                    faultsList = [20480, 20481] + list(range(20483, 20499))

                    # Check if bus is overloaded
                    try:
                        # Fault Read Test
                        currentAutomator.readSdoList(sdoList=faultsList, timeDelay=timeDelayIn, timeout=timeout)
                    except (KeyboardInterrupt) as exc: #For when user interrupts program
                        print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                            raise AntiCanapeError(exc)
                        raise # If debug mode off, still raise the error as is
                    except (BusOverloadError) as exc: # For when the bus is overloaded / in error state
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                        raise AntiCanapeError(exc)
                    except (PcanError, PcanCanOperationError, PcanCanInitializationError) as exc: # For when pcan has issues
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: A PCAN Error was detected!")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                        raise AntiCanapeError(exc)
                    except (CanError, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, CanTimeoutError) as exc: # Any CAN error
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: A CAN Error was detected!")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                        raise AntiCanapeError(exc)
                    except (AttributeError) as exc: # Silly attribute errors
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: There has been an error when scanning for the ID!")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                        raise AntiCanapeError(exc)
                    except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
                        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: An Error was detected!")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                            raise

                    # Extra sleep to allow bus to chill
                    sleep(timeDelayIn)
                    # Flush the bus buffer
                    bus.flush_tx_buffer()

                except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
                    if DEBUG_MODE:
                        logging.debug("CC Anti-Canape: An Error was detected!")
                        logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                        logging.error(exc)
                        raise

    except (KeyboardInterrupt) as exc:
        print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
        if DEBUG_MODE:
                            logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                            logging.error(exc)
                            raise AntiCanapeError(exc)
        raise
    except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
        if DEBUG_MODE:
            logging.debug("CC Anti-Canape: An Error was detected!")
            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
            logging.error(exc)
            raise
    except Exception as exc:
        if DEBUG_MODE:
            logging.debug("CC Anti-Canape: An Error was detected!")
            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
            logging.error(exc)
            raise

script_writeDstTest(timeDelayIn=0.03, iterations=1)

This script is designed to implement the functionality of Anti-Canape objects in order to test writing the DST SDO messages. It will continually write the DST SDO messages from the DOT to the VCM for the specified amount of iterations.

This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

It will track all correct and error iterations and present them all to the user at the end of script execution.

Parameters:

Name Type Description Default
timeDelayIn float

The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

0.03
iterations int

The amount of times the test should be run. Defaults to 1.

1
timeout int

The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

required

Returns:

Name Type Description
bool bool

True if the script completed correctly, False otherwise.

Source code in drivers\automator.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
def script_writeDstTest(self, timeDelayIn:float=0.03, iterations:int=1) -> bool:
    ''' This script is designed to implement the functionality of Anti-Canape objects in order to test writing the DST SDO messages. It will continually write the DST SDO messages from the DOT to the VCM for the specified amount of iterations. 

    This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

    It will track all correct and error iterations and present them all to the user at the end of script execution.

    Args:
        timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
        iterations (int, optional): The amount of times the test should be run. Defaults to 1.
        timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

    Returns:
        bool: True if the script completed correctly, False otherwise.
    '''        
    try:
        # DST Result / Error Tracking
        dstErrorCounter = 0
        dstErrorList = []
        dstErrorParamsList = []
        dstCorrectList = []

        try:
            # Get the bus for the specified driver
            with self.getDefaultBus(self.driver) as bus:

                # Get the CAN message reader for the specified bus
                reader = ACReader(busIn=bus)
                # Get the CAN message writer for the specified bus
                writer = ACWriter(busIn=bus)
                dbcParser = dbcParse()
                # Get the DOT parser for lookup (SDO)
                dotParser = dotParse()
                #Load the DBC dictionaries into the parser
                dbcParser = dbcParse()
                # Load the DOT dictionaries into the parser
                dotParser.loadDictionaries()
                # Load the DOT parser into the reader
                reader.loadDotParser(dotParser=dotParser)
                # Load the DOT parser into the writer
                writer.loadDotParser(dotParser=dotParser)

                # Get an automator
                currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

                # Repeat the test 100 times
                for iterCount in range(iterations):
                    # Get the operation error every 50 when bus is reinitialized
                    try:
                        # DST SDO Automation Test
                        dstSdoAutomator = currentAutomator.writeSdoDict(sdoDictionary={
                            "UTC_TimeZoneOffset_Minutes": random.choice(list(range(0, 840))),
                            "DST_enable": random.choice([0, 1]),
                            "DST_rule_UTC_start": random.choice([0, 1, 2]),
                            "DST_rule_dateType_start": random.choice([1, 2, 3]),
                            "DST_rule_date_start": random.choice(list(range(1, 32))), #[1, 31] == [1, 32)
                            "DST_rule_month_start": random.choice(list(range(1, 13))), #[1, 12] == [1, 13)
                            "DST_rule_week_start": random.choice(list(range(1, 5))), #[1, 4] == [1, 5)
                            "DST_rule_weekday_start": random.choice(list(range(0, 7))), #[0, 6] == [0, 7)
                            "DST_rule_time_start": random.choice(list(range(0, 1441))), #[0, 1440] == [0, 1441)
                            "DST_rule_UTC_end": random.choice([0, 1, 2]),
                            "DST_rule_dateType_end": random.choice([1, 2, 3]),
                            "DST_rule_date_end": random.choice(list(range(1, 32))), #[1, 31] == [1, 32)
                            "DST_rule_month_end": random.choice(list(range(1, 13))), #[1, 12] == [1, 13)
                            "DST_rule_week_end": random.choice(list(range(1, 5))), #[1, 4] == [1, 5)
                            "DST_rule_weekday_end": random.choice(list(range(0, 7))), #[0, 6] == [0, 7)
                            "DST_rule_time_end": random.choice(list(range(0, 1441))), #[0, 1440] == [0, 1441)
                        },
                        # Pass on the time delay
                        timeDelay=timeDelayIn)

                        # Results
                        if dstSdoAutomator[1] == 0:
                            dstCorrectList.append(iterCount)
                        else:
                            dstErrorCounter += dstSdoAutomator[1]
                            dstErrorParamsList.append(dstSdoAutomator[2])
                            dstErrorList.append(iterCount)


                        # Iteration Manager
                        print("CC Anti-Canape: Beginning Iteration", iterCount)  

                    except KeyboardInterrupt:
                        print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

        except KeyboardInterrupt:
            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")
        except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
            if DEBUG_MODE:
                logging.debug("CC Anti-Canape: An Error was detected!")
                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                logging.error(exc)
                raise

        # Overall Result Display
        print("Test Completed!")
        print("DST Error Counter: ", dstErrorCounter)
        print("")

        # Error Result Display
        if dstErrorCounter != 0:
            print("DST Error List: ", dstErrorList)
            print("")
            print("DST Error Params: ", dstErrorParamsList)
            print("")

        # Correct Result Display
        print("DST Correct: ", dstCorrectList)

    except(AntiCanapeError) as exc: # For an Anti-Canape error deeper down
        if DEBUG_MODE:
            logging.debug("CC Anti-Canape: An Error was detected!")
            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
            logging.error(exc)
            raise

    #Return true for total script completion
    return True

script_writeVehicleConfigActiveTest(timeDelayIn=0.03, iterations=1, timeout=None)

This script is designed to implement the functionality of Anti-Canape objects in order to test writing the vehicle config active SDO messages. It will continually write the VehicleConfig_Active SDO messages from the DOT to the VCM for the specified amount of iterations.

This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

It will track all correct and error iterations and present them all to the user at the end of script execution.

Parameters:

Name Type Description Default
timeDelayIn float

The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

0.03
iterations int

The amount of times the test should be run. Defaults to 1.

1
timeout int

The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

None

Returns:

Type Description
list

[passes, fails] (list): A list containing the amount of passed tests and amount of failed tests

Source code in drivers\automator.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def script_writeVehicleConfigActiveTest(self, timeDelayIn:float=0.03, iterations:int=1, timeout:int=None) -> list:
    ''' This script is designed to implement the functionality of Anti-Canape objects in order to test writing the vehicle config active SDO messages. It will continually write the VehicleConfig_Active SDO messages from the DOT to the VCM for the specified amount of iterations. 

    This script tests the SDO messages by reading the initial value of a SDO parameter present on the VCM, writing a randomized value in the range for each SDO parameter to the VCM, then reading the final value for the SDO parameter present on the VCM. If the value was correctly updated, then the test for that parameter passes.

    It will track all correct and error iterations and present them all to the user at the end of script execution.

    Args:
        timeDelayIn (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
        iterations (int, optional): The amount of times the test should be run. Defaults to 1.
        timeout (int): The timeout in seconds for reading the message from the bus. Defaults to None. If None, thread continues indefinitely.

    Returns:
        [passes, fails] (list): A list containing the amount of passed tests and amount of failed tests
    '''        
    try:
        # Vehicle Config Result / Error Tracking
        vehicleConfigCorrectList = []
        vehicleConfigErrorCounter = 0
        vehicleConfigErrorList = []
        vehicleConfigErrorParamsList = []

        try:
            # Get the bus for the specified driver
            with self.getDefaultBus(self.driver) as bus:

                # Get the CAN message reader for the specified bus
                reader = ACReader(busIn=bus)
                # Get the CAN message writer for the specified bus
                writer = ACWriter(busIn=bus)
                # Get the DBC parser for lookup (CAN)
                dbcParser = dbcParse()
                # Get the DOT parser for lookup (SDO)
                dotParser = dotParse()
                #Load the DBC dictionaries into the parser
                dbcParser = dbcParse()
                # Load the DOT dictionaries into the parser
                dotParser.loadDictionaries()
                # Load the DOT parser into the reader
                reader.loadDotParser(dotParser=dotParser)
                # Load the DOT parser into the writer
                writer.loadDotParser(dotParser=dotParser)

                # Get an automator
                currentAutomator = sdoAutomator(readerIn=reader, writerIn=writer, dbcParseIn=dbcParser, dotParseIn=dotParser)

                # Repeat the test 100 times
                for iterCount in range(iterations):
                    # Get the operation error every 50 when bus is reinitialized
                    try:
                        # Iteration Manager
                        print("CC Anti-Canape: Beginning Iteration", iterCount)

                        # vehicleConfig SDO Automation Test
                        # If it takes longer than 60 seconds, quit
                        vehicleConfigSdoAutomator = currentAutomator.writeSdoDict(sdoDictionary={
                            "VehicleConfig_Active_PedalUp": random.choice([0, 1, 2]),
                            "VehicleConfig_Active_DCDCConverter": random.choice([0, 1]),
                            "VehicleConfig_Active_Horn": random.choice([0, 1]),
                            "VehicleConfig_Active_Headlights": random.choice([0, 1]),
                            "VehicleConfig_Active_TurnSignals": random.choice([0, 1]),
                            "VehicleConfig_Active_HazardSwitch": random.choice([0, 1]),
                            "VehicleConfig_Active_BrakeLights": random.choice([0, 1]),
                            "VehicleConfig_Active_WigWag": random.choice([0, 1]),
                            "VehicleConfig_Active_LogoLight": random.choice([0, 1]),
                            "VehicleConfig_Active_DRLEnable": random.choice([0, 1]),
                            "VehicleConfig_Active_VisageControl": random.choice([0, 1]),
                            "VehiceConfig_Active_BodyBuilderRelay": random.choice([0, 1]),
                            "VehicleConfig_Active_PerformanceProfile": random.choice([0, 1, 2]),
                            "VehicleConfig_Active_NumBeepsStartOfCharge": random.choice([0, 1, 2, 3]),
                            "VehicleConfig_Active_OffPeakCharging":random.choice([0, 1, 2]),
                            "VehicleConfig_Active_SOC_Mode": random.choice([0, 1, 10]),
                            "VehicleConfig_Active_DRL_Mode": random.choice([0, 1]),

                            # Timeout's can only be every 20, so scale the random choice
                            "VehicleConfig_Active_Acc_Timeout": (random.choice(list(range(0, 360000))) * 20),
                            "VehicleConfig_Active_Run_Timeout": (random.choice(list(range(0, 135000))) * 20),
                            "VehicleConfig_Active_KeyOffLight_Timeout": (random.choice(list(range(0, 30000))) * 20),

                            # Fails, some of the time
                            "VehicleConfig_Active_BSI": random.choice([0, 1]),
                            "VehicleConfig_Active_isAPPS": random.choice([0, 1]),
                            "VehicleConfig_Active_MaxSpeedRev": random.choice(list(range(0, 76))),

                            # Timeout's can only be every 20, so scale the random choice
                            "VehicleConfig_Active_KeyOffHorn_Timeout": (random.choice(list(range(0, 2160000))) * 20),

                            "VehicleConfig_Active_MotorType": random.choice([0, 1, 2, 3, 4]),
                            "VehicleConfig_Active_WindshieldWiper": random.choice([0, 1]),
                            "VehicleConfig_Active_WindshieldWasher":random.choice([0, 1]),
                            "VehicleConfig_Active_AudioAmplifier": random.choice([0, 1]),
                            "VehicleConfig_Active_Motion_Mode": random.choice([0, 1]),
                            }, 
                            # Pass on the time delay and timeout
                            timeDelay=timeDelayIn, timeout=timeout)

                        # Extra sleep to allow bus to chill
                        sleep(timeDelayIn)
                        # Flush the bus buffer
                        bus.flush_tx_buffer()

                        # Results
                        if vehicleConfigSdoAutomator[1] == 0:
                            vehicleConfigCorrectList.append(iterCount)
                        else:
                            vehicleConfigErrorCounter += vehicleConfigSdoAutomator[1]
                            vehicleConfigErrorParamsList.append(vehicleConfigSdoAutomator[2])
                            vehicleConfigErrorList.append(iterCount)  

                    # Get the operation error every 50 when bus is reinitialized
                    except (CanOperationError, PcanCanOperationError) as exc:
                        print("CC Anti-Canape: An Operation Error Has Been Detected!")
                        print(exc)
                        print("Error occured on line", cf.f_lineno, "of file", pythonfilename)



        except KeyboardInterrupt:
            print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

        # Overall Result Display
        print("Test Completed!")
        print("VehicleConfig Error Counter: ", vehicleConfigErrorCounter)
        print("")

        # Error Result Display
        if vehicleConfigErrorCounter != 0:
            print("VehicleConfig Error List: ", vehicleConfigErrorList)
            print("")
            print("VehicleConfig Error Params: ", vehicleConfigErrorParamsList)
            print("")

        # Correct Result Display
        print("VehicleConfig Correct:", vehicleConfigCorrectList)

    except KeyboardInterrupt:
        print("CC Anti-Canape: Program Execution Halted by Keyboard Interrupt!")

    # Return amount of passed tests and failed tests
    return [len(vehicleConfigCorrectList), len(vehicleConfigErrorList)]

sdoAutomator

This class provides a thread-based automator for interacting with SDO messages. Specifically, it creates an object that contains two automation methods, being writeSdoDict and readSdoDict. It can be used to gather results from reading and writing sdo messages automatically without the need to continually reference lower-level support methods.

Attributes:

Name Type Description
reader

An ACReader object to use for reading to the CAN bus.

writer

An ACWriter object to use for writing to the CAN bus.

dbcParse

A dbcAutoParse object to use for looking up dbc information (CAN messages, signals, and attributes).

dotParse

A dotAutoParse object to use for looking up dot information (SDO ID's, payloads, and attributes).

Examples:

mySdoAutomator(myReader, myWriter, myDbcParse, myDotParse) mySdoAutomator.writeSdoDict({"VehicleConfig_Active_PedalUp":1}) mySdoAutomator.readSdoList({"VehicleConfig_Active_PedalUp"})

Raises:

Type Description
Exception

Raised if there is an error with SDO reading/writing automation.

Exception

Raised if there is an error with SDO reading/writing automation.

Source code in drivers\automator.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class sdoAutomator():
    ''' This class provides a thread-based automator for interacting with SDO messages. Specifically, it creates an object that contains two automation methods, being writeSdoDict and readSdoDict. It can be used to gather results from reading and writing sdo messages automatically without the need to continually reference lower-level support methods.

    Attributes:
        reader: An ACReader object to use for reading to the CAN bus.
        writer: An ACWriter object to use for writing to the CAN bus.
        dbcParse: A dbcAutoParse object to use for looking up dbc information (CAN messages, signals, and attributes).
        dotParse: A dotAutoParse object to use for looking up dot information (SDO ID's, payloads, and attributes).

    Examples:
        mySdoAutomator(myReader, myWriter, myDbcParse, myDotParse)
        mySdoAutomator.writeSdoDict({"VehicleConfig_Active_PedalUp":1})
        mySdoAutomator.readSdoList({"VehicleConfig_Active_PedalUp"})

    Raises:
        Exception: Raised if there is an error with SDO reading/writing automation.
        Exception: Raised if there is an error with SDO reading/writing automation.
    '''    
    ########################
    # CLASS INITIALIZATION #
    ########################
    def __init__(self, readerIn:ACReader=None, writerIn:ACWriter=None, dbcParseIn:dbcParse=None, dotParseIn:dotParse=None) -> None:
        ''' This method will initialize an sdoAutomator object to provide an instance for automation. It will set the needed threads for the instance reader, writer, dbcParse, and dotParse.

        Args:
            readerIn (ACReader, optional): An ACReader object to use for reading to the CAN bus. Defaults to None.
            writerIn (ACWriter, optional): An ACWriter object to use for writing to the CAN bus. Defaults to None.
            dbcParseIn (dbcParse, optional): A dbcAutoParse object to use for looking up dbc information (CAN messages, signals, and attributes). Defaults to None.
            dotParseIn (dotParse, optional): A dotAutoParse object to use for looking up dot information (SDO ID's, payloads, and attributes). Defaults to None.
        '''        
        # Set thread objects
        self.reader = readerIn
        self.writer = writerIn
        self.dbcParse = dbcParseIn
        self.dotParse = dotParseIn


    ###################################
    # SDO AUTOMATIC DICTIONARY WRITER #
    ###################################
    def writeSdoDict(self, sdoDictionary:dict, timeDelay:float=0.03, doPrint:bool=True, timeout:int=None) -> list:
        ''' This method is designed to  write a given dictionary of SDO parameters in the form of {param:new_value}.

        Args:
            sdoDictionary (dict): Dictionary of param-value pairs.
            timeDelay (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
            doPrint (bool, optional): Whether to print results or not. Defaults to True.
            timeout (int): The timeout length (in seconds) for each read and write command.

        Raises:
            Exception: Raises an Exception if the sdoDictionary is empty.

        Returns:
            list: A list containing [sdoDataFrame, errorCounter, errorList].
        '''
        try:
            # If the dictionary is empty, raise an exception to alert and stop execution
            if (len(sdoDictionary) == 0):
                raise Exception(Back.MAGENTA + Fore.CYAN + "CC Anti-Canape Automator: You must provide a dictionary of param-value pairs to write!")

            # Create the return data frame
            sdoDataFrame = pd.DataFrame(columns=['Parameter Name', 'Initial Read Value', 'Written Value', 'Final Read Value'])

            # Create the printing data frame
            printDataFrame = pd.DataFrame(columns=[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Initial Read Value ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Written Value ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Final Read Value ' + Style.RESET_ALL])

            # Initialize error counter
            errorCounter = 0
            errorList = []

            # For each parameter to send
            for paramToSend in sdoDictionary.keys():
                # Create the initial parameter list, starting with the parameter name
                paramReturnList = [paramToSend]
                paramPrintList = []

                try:
                    # Read the initial value and add it to the list
                    self.reader.singleSdoRead(mainId=paramToSend, writer=self.writer, timeoutIn=timeout)
                    pol = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                    paramReturnList.append(pol[-1])
                except CanError as exc:
                    print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                    print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                    print(exc)

                sleep(timeDelay)

                try:
                    # Write the new value and add it to the list
                    self.writer.singleSdoWrite(mainId=paramToSend, payloadIn=sdoDictionary[paramToSend], timeoutIn=timeout)
                    paramReturnList.append(sdoDictionary[paramToSend])
                    olp = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                except CanError as exc:
                    print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                    print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                    print(exc)

                sleep(timeDelay)

                try:
                    # Read the final value and add it to the list
                    self.reader.singleSdoRead(mainId=paramToSend, writer=self.writer, timeoutIn=timeout)
                    lop = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                    paramReturnList.append(lop[-1])
                except CanError as exc:
                    print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                    print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                    print(exc)

                sleep(timeDelay)

                # Add the return row to the dataframe
                sdoDataFrame.loc[len(sdoDataFrame.index)] = paramReturnList

                # Check if the written value matched the final read value
                if (str(paramReturnList[-1]) == str(paramReturnList[-2])):
                    # If True, then the write was successful, so we color the param name green
                    paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + paramToSend)

                    # Add the rest of the values in colored text aswell
                    paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[1]))
                    paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[2]))
                    paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[3]) + Style.RESET_ALL)

                else:
                    # If False, then the write was not successful, so we color the param name red
                    paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + paramToSend)

                    # Add the rest of the values in colored text aswell
                    paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[1]))
                    paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[2]))
                    paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[3]) + Style.RESET_ALL)

                    # Increment Error Counter
                    errorCounter += 1
                    errorList.append(paramToSend)

                # Add the print row to the dataframe
                printDataFrame.loc[len(printDataFrame)] = paramPrintList

            if doPrint:
                # Print the print data frame
                print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
                print(Style.RESET_ALL)

            # Return the return data frame
            return [sdoDataFrame, errorCounter, errorList]

        # Catch bus re-initializing
        except (CanOperationError, PcanCanOperationError):
            print("CC Anti-Canape: The CAN Channel is Currently Down!")
            print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)



    #############################
    # SDO AUTOMATIC LIST READER #
    #############################
    def readSdoList(self, sdoList:list, timeDelay:float=0.03, doPrint:bool=True, timeout:int=None) -> pd.DataFrame:
        ''' This method is designed to read a given list of SDO parameters.

        Args:
            sdoList (dict): List of paramameters to read.
            doPrint (bool, optional): Whether to print results or not. Defaults to True.
            timeout (int): The timeout length (in seconds) for each read and write command.

        Raises:
            Exception: Raises an Exception if the sdoList is empty.

        Returns:
            Dataframe: Pandas Dataframe containing parameters read and values read.
        '''        

        try:
            # If the dictionary is empty, raise an exception to alert and stop execution
            if (len(sdoList) == 0):
                raise Exception(Back.MAGENTA + Fore.CYAN + "CC Anti-Canape Automator: You must provide a list of parameters to read!")

            # Create the return data frame
            sdoDataFrame = pd.DataFrame(columns=['Parameter Name', 'Read Value'])

            # Create the printing data frame
            printDataFrame = pd.DataFrame(columns=[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Read Value ' + Style.RESET_ALL])

            # For each parameter to read
            for paramToRead in sdoList:
                try:
                    # Check if the bus is overloaded
                    # [https://python-can.readthedocs.io/en/stable/bus.html#can.BusABC.state]
                    if self.writer.bus.state == 3:
                        raise BusOverloadError

                    # Create the initial parameter list, starting with the parameter name
                    paramReturnList = [paramToRead]
                    paramPrintList = [paramToRead]

                    sleep(timeDelay)

                    # Request a read of the SDO message
                    self.reader.singleSdoRead(mainId=paramToRead, writer=self.writer, timeoutIn=timeout)

                    sleep(timeDelay)

                    # Grab the VCM Response
                    sdoResponse = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                    paramReturnList.append(sdoResponse[-1])
                    paramPrintList.append(sdoResponse[-1])

                    # Add the return row to the dataframe
                    sdoDataFrame.loc[len(sdoDataFrame.index)] = paramReturnList

                    # Add the print row to the dataframe
                    printDataFrame.loc[len(printDataFrame)] = paramPrintList
                except BusOverloadError as exc:
                    if DEBUG_MODE:
                        logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                        logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                        logging.error(exc)
                        raise
                except (PcanError, PcanCanOperationError, PcanCanInitializationError) as exc:
                    if DEBUG_MODE:
                        logging.debug("CC Anti-Canape: A PCAN Error was detected!")
                        logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                        logging.error(exc)
                        raise
                except (CanError, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, CanTimeoutError) as exc:
                    if DEBUG_MODE:
                        logging.debug("CC Anti-Canape: A CAN Error was detected!")
                        logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                        logging.error(exc)
                        raise

            if doPrint:
                # Print the print data frame
                print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
                print(Style.RESET_ALL)

            # Return the return data frame
            return sdoDataFrame

        # Catch bus re-initializing
        except (CanOperationError, PcanCanOperationError) as exc:
            print("CC Anti-Canape: The CAN Channel is Currently Down!")
            print("CC ANti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
            raise

        except BusOverloadError:
                logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                logging.error(exc)
                raise

__init__(readerIn=None, writerIn=None, dbcParseIn=None, dotParseIn=None)

This method will initialize an sdoAutomator object to provide an instance for automation. It will set the needed threads for the instance reader, writer, dbcParse, and dotParse.

Parameters:

Name Type Description Default
readerIn ACReader

An ACReader object to use for reading to the CAN bus. Defaults to None.

None
writerIn ACWriter

An ACWriter object to use for writing to the CAN bus. Defaults to None.

None
dbcParseIn dbcParse

A dbcAutoParse object to use for looking up dbc information (CAN messages, signals, and attributes). Defaults to None.

None
dotParseIn dotParse

A dotAutoParse object to use for looking up dot information (SDO ID's, payloads, and attributes). Defaults to None.

None
Source code in drivers\automator.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(self, readerIn:ACReader=None, writerIn:ACWriter=None, dbcParseIn:dbcParse=None, dotParseIn:dotParse=None) -> None:
    ''' This method will initialize an sdoAutomator object to provide an instance for automation. It will set the needed threads for the instance reader, writer, dbcParse, and dotParse.

    Args:
        readerIn (ACReader, optional): An ACReader object to use for reading to the CAN bus. Defaults to None.
        writerIn (ACWriter, optional): An ACWriter object to use for writing to the CAN bus. Defaults to None.
        dbcParseIn (dbcParse, optional): A dbcAutoParse object to use for looking up dbc information (CAN messages, signals, and attributes). Defaults to None.
        dotParseIn (dotParse, optional): A dotAutoParse object to use for looking up dot information (SDO ID's, payloads, and attributes). Defaults to None.
    '''        
    # Set thread objects
    self.reader = readerIn
    self.writer = writerIn
    self.dbcParse = dbcParseIn
    self.dotParse = dotParseIn

readSdoList(sdoList, timeDelay=0.03, doPrint=True, timeout=None)

This method is designed to read a given list of SDO parameters.

Parameters:

Name Type Description Default
sdoList dict

List of paramameters to read.

required
doPrint bool

Whether to print results or not. Defaults to True.

True
timeout int

The timeout length (in seconds) for each read and write command.

None

Raises:

Type Description
Exception

Raises an Exception if the sdoList is empty.

Returns:

Name Type Description
Dataframe DataFrame

Pandas Dataframe containing parameters read and values read.

Source code in drivers\automator.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def readSdoList(self, sdoList:list, timeDelay:float=0.03, doPrint:bool=True, timeout:int=None) -> pd.DataFrame:
    ''' This method is designed to read a given list of SDO parameters.

    Args:
        sdoList (dict): List of paramameters to read.
        doPrint (bool, optional): Whether to print results or not. Defaults to True.
        timeout (int): The timeout length (in seconds) for each read and write command.

    Raises:
        Exception: Raises an Exception if the sdoList is empty.

    Returns:
        Dataframe: Pandas Dataframe containing parameters read and values read.
    '''        

    try:
        # If the dictionary is empty, raise an exception to alert and stop execution
        if (len(sdoList) == 0):
            raise Exception(Back.MAGENTA + Fore.CYAN + "CC Anti-Canape Automator: You must provide a list of parameters to read!")

        # Create the return data frame
        sdoDataFrame = pd.DataFrame(columns=['Parameter Name', 'Read Value'])

        # Create the printing data frame
        printDataFrame = pd.DataFrame(columns=[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Read Value ' + Style.RESET_ALL])

        # For each parameter to read
        for paramToRead in sdoList:
            try:
                # Check if the bus is overloaded
                # [https://python-can.readthedocs.io/en/stable/bus.html#can.BusABC.state]
                if self.writer.bus.state == 3:
                    raise BusOverloadError

                # Create the initial parameter list, starting with the parameter name
                paramReturnList = [paramToRead]
                paramPrintList = [paramToRead]

                sleep(timeDelay)

                # Request a read of the SDO message
                self.reader.singleSdoRead(mainId=paramToRead, writer=self.writer, timeoutIn=timeout)

                sleep(timeDelay)

                # Grab the VCM Response
                sdoResponse = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                paramReturnList.append(sdoResponse[-1])
                paramPrintList.append(sdoResponse[-1])

                # Add the return row to the dataframe
                sdoDataFrame.loc[len(sdoDataFrame.index)] = paramReturnList

                # Add the print row to the dataframe
                printDataFrame.loc[len(printDataFrame)] = paramPrintList
            except BusOverloadError as exc:
                if DEBUG_MODE:
                    logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
                    logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                    logging.error(exc)
                    raise
            except (PcanError, PcanCanOperationError, PcanCanInitializationError) as exc:
                if DEBUG_MODE:
                    logging.debug("CC Anti-Canape: A PCAN Error was detected!")
                    logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                    logging.error(exc)
                    raise
            except (CanError, CanInitializationError, CanInterfaceNotImplementedError, CanOperationError, CanTimeoutError) as exc:
                if DEBUG_MODE:
                    logging.debug("CC Anti-Canape: A CAN Error was detected!")
                    logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
                    logging.error(exc)
                    raise

        if doPrint:
            # Print the print data frame
            print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
            print(Style.RESET_ALL)

        # Return the return data frame
        return sdoDataFrame

    # Catch bus re-initializing
    except (CanOperationError, PcanCanOperationError) as exc:
        print("CC Anti-Canape: The CAN Channel is Currently Down!")
        print("CC ANti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
        raise

    except BusOverloadError:
            logging.debug("CC Anti-Canape: A heavy/overloaded bus was detected! Restarting script...")
            logging.debug("Error occured on line", cf.f_lineno, "of file", pythonfilename)
            logging.error(exc)
            raise

writeSdoDict(sdoDictionary, timeDelay=0.03, doPrint=True, timeout=None)

This method is designed to write a given dictionary of SDO parameters in the form of {param:new_value}.

Parameters:

Name Type Description Default
sdoDictionary dict

Dictionary of param-value pairs.

required
timeDelay float

The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.

0.03
doPrint bool

Whether to print results or not. Defaults to True.

True
timeout int

The timeout length (in seconds) for each read and write command.

None

Raises:

Type Description
Exception

Raises an Exception if the sdoDictionary is empty.

Returns:

Name Type Description
list list

A list containing [sdoDataFrame, errorCounter, errorList].

Source code in drivers\automator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def writeSdoDict(self, sdoDictionary:dict, timeDelay:float=0.03, doPrint:bool=True, timeout:int=None) -> list:
    ''' This method is designed to  write a given dictionary of SDO parameters in the form of {param:new_value}.

    Args:
        sdoDictionary (dict): Dictionary of param-value pairs.
        timeDelay (float, optional): The delay (in seconds) that the program should wait before sending another SDO message. Defaults to 0.03.
        doPrint (bool, optional): Whether to print results or not. Defaults to True.
        timeout (int): The timeout length (in seconds) for each read and write command.

    Raises:
        Exception: Raises an Exception if the sdoDictionary is empty.

    Returns:
        list: A list containing [sdoDataFrame, errorCounter, errorList].
    '''
    try:
        # If the dictionary is empty, raise an exception to alert and stop execution
        if (len(sdoDictionary) == 0):
            raise Exception(Back.MAGENTA + Fore.CYAN + "CC Anti-Canape Automator: You must provide a dictionary of param-value pairs to write!")

        # Create the return data frame
        sdoDataFrame = pd.DataFrame(columns=['Parameter Name', 'Initial Read Value', 'Written Value', 'Final Read Value'])

        # Create the printing data frame
        printDataFrame = pd.DataFrame(columns=[Fore.WHITE + Back.BLUE + Style.DIM + ' Parameter Name ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Initial Read Value ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Written Value ' + Style.RESET_ALL, Fore.WHITE + Back.BLUE + Style.DIM + ' Final Read Value ' + Style.RESET_ALL])

        # Initialize error counter
        errorCounter = 0
        errorList = []

        # For each parameter to send
        for paramToSend in sdoDictionary.keys():
            # Create the initial parameter list, starting with the parameter name
            paramReturnList = [paramToSend]
            paramPrintList = []

            try:
                # Read the initial value and add it to the list
                self.reader.singleSdoRead(mainId=paramToSend, writer=self.writer, timeoutIn=timeout)
                pol = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                paramReturnList.append(pol[-1])
            except CanError as exc:
                print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                print(exc)

            sleep(timeDelay)

            try:
                # Write the new value and add it to the list
                self.writer.singleSdoWrite(mainId=paramToSend, payloadIn=sdoDictionary[paramToSend], timeoutIn=timeout)
                paramReturnList.append(sdoDictionary[paramToSend])
                olp = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
            except CanError as exc:
                print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                print(exc)

            sleep(timeDelay)

            try:
                # Read the final value and add it to the list
                self.reader.singleSdoRead(mainId=paramToSend, writer=self.writer, timeoutIn=timeout)
                lop = self.reader.getSdoResponse(doPrint=False, timeoutIn=timeout)
                paramReturnList.append(lop[-1])
            except CanError as exc:
                print("CC Anti-Canape: An Error Has Occured When Communicating with the CAN Bus!")
                print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)
                print(exc)

            sleep(timeDelay)

            # Add the return row to the dataframe
            sdoDataFrame.loc[len(sdoDataFrame.index)] = paramReturnList

            # Check if the written value matched the final read value
            if (str(paramReturnList[-1]) == str(paramReturnList[-2])):
                # If True, then the write was successful, so we color the param name green
                paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + paramToSend)

                # Add the rest of the values in colored text aswell
                paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[1]))
                paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[2]))
                paramPrintList.append(Back.GREEN + Fore.BLUE + Style.BRIGHT + str(paramReturnList[3]) + Style.RESET_ALL)

            else:
                # If False, then the write was not successful, so we color the param name red
                paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + paramToSend)

                # Add the rest of the values in colored text aswell
                paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[1]))
                paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[2]))
                paramPrintList.append(Back.RED + Fore.BLUE + Style.BRIGHT + str(paramReturnList[3]) + Style.RESET_ALL)

                # Increment Error Counter
                errorCounter += 1
                errorList.append(paramToSend)

            # Add the print row to the dataframe
            printDataFrame.loc[len(printDataFrame)] = paramPrintList

        if doPrint:
            # Print the print data frame
            print(tabulate(printDataFrame, showindex=False, headers=printDataFrame.columns))
            print(Style.RESET_ALL)

        # Return the return data frame
        return [sdoDataFrame, errorCounter, errorList]

    # Catch bus re-initializing
    except (CanOperationError, PcanCanOperationError):
        print("CC Anti-Canape: The CAN Channel is Currently Down!")
        print("CC Anti-Canape: This error got thrown on line", cf.f_lineno, "of file", pythonfilename)