Advanced Command Generator

Concurrent queries

Send multiple SNMP GET requests at once using the following options:

  • with SNMPv2c, community ‘public’
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • for two instances of SNMPv2-MIB::sysDescr.0 and SNMPv2-MIB::sysLocation.0 MIB object,
  • based on Twisted I/O framework

Functionally similar to:

$ snmpget -v2c -c public demo.snmplabs.com SNMPv2-MIB::sysDescr.0
$ snmpget -v2c -c public demo.snmplabs.com SNMPv2-MIB::sysLocation.0
from twisted.internet.defer import DeferredList
from twisted.internet.task import react
from pysnmp.hlapi.v3arch.twisted import *


def success(args, hostname):
    (errorStatus, errorIndex, varBinds) = args

    if errorStatus:
        print('%s: %s at %s' % (hostname,
                                errorStatus.prettyPrint(),
                                errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))

    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))


def failure(errorIndication, hostname):
    print('%s failure: %s' % (hostname, errorIndication))


# noinspection PyUnusedLocal
def getSystem(reactor, hostname):
    snmpEngine = SnmpEngine()

    def getScalar(objectType):
        deferred = getCmd(
            snmpEngine,
            CommunityData('public', mpModel=0),
            UdpTransportTarget((hostname, 161)),
            ContextData(),
            objectType
        )

        deferred.addCallback(success, hostname).addErrback(failure, hostname)

        return deferred

    return DeferredList(
        [getScalar(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0))),
         getScalar(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))]
    )


react(getSystem, ['demo.snmplabs.com'])

Download script.

Walk multiple Agents at once

  • with SNMPv3 with user ‘usr-md5-none’, MD5 auth and no privacy protocols
  • over IPv4/UDP
  • to Agents at demo.snmplabs.com:161 and demo.snmplabs.com:1161
  • for multiple MIB subtrees and tables
  • for whole MIB
  • based on Twisted I/O framework

Functionally similar to:

$ snmpget -v2c -c public demo.snmplabs.com:161 SNMPv2-MIB::system
$ snmpget -v2c -c public demo.snmplabs.comL1161 SNMPv2-MIB::system
from twisted.internet.defer import DeferredList
from twisted.internet.task import react
from pysnmp.hlapi.v3arch.twisted import *


def success(args, reactor, snmpEngine, hostname):
    errorStatus, errorIndex, varBindTable = args

    if errorStatus:
        print('%s: %s at %s' % (hostname,
                                errorStatus.prettyPrint(),
                                errorIndex and varBindTable[0][int(errorIndex) - 1][0] or '?'))

    else:
        for varBindRow in varBindTable:
            for varBind in varBindRow:
                print(' = '.join([x.prettyPrint() for x in varBind]))

        if not isEndOfMib(varBindTable[-1]):
            return getbulk(reactor, snmpEngine, hostname, *varBindTable[-1])


def failure(errorIndication):
    print(errorIndication)


def getbulk(reactor, snmpEngine, hostname, varBinds):
    deferred = bulkCmd(
        snmpEngine,
        UsmUserData('usr-md5-none', 'authkey1'),
        UdpTransportTarget(hostname),
        ContextData(),
        0, 25,
        varBinds
    )

    deferred.addCallback(success, reactor, snmpEngine, hostname).addErrback(failure)

    return deferred


def getall(reactor, hostnames):

    snmpEngine = SnmpEngine()

    return DeferredList(
        [getbulk(reactor, snmpEngine, hostname,
                 ObjectType(ObjectIdentity('SNMPv2-MIB', 'system')))
         for hostname in hostnames]
    )


react(getall, [(('demo.snmplabs.com', 161), ('demo.snmplabs.com', 1161))])

Download script.

See also: library reference.