Agent-side MIB implementations

Implementing scalar MIB objects

Listen and respond to SNMP GET/GETNEXT queries with the following options:

  • SNMPv1 or SNMPv2c
  • with SNMP community “public”
  • over IPv4/UDP, listening at 127.0.0.1:161
  • over IPv6/UDP, listening at [::1]:161
  • serving two Managed Objects Instances (sysDescr.0 and sysUptime.0)

Either of the following Net-SNMP commands will walk this Agent:

$ snmpwalk -v2c -c public 127.0.0.1 .1.3.6
$ snmpwalk -v2c -c public udp6:[::1] .1.3.6

The Command Receiver below uses two distinct transports for communication with Command Generators - UDP over IPv4 and UDP over IPv6.

from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher
from pysnmp.carrier.asyncore.dgram import udp, udp6
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
import time, bisect


class SysDescr(object):
    name = (1, 3, 6, 1, 2, 1, 1, 1, 0)

    def __eq__(self, other): return self.name == other

    def __ne__(self, other): return self.name != other

    def __lt__(self, other): return self.name < other

    def __le__(self, other): return self.name <= other

    def __gt__(self, other): return self.name > other

    def __ge__(self, other): return self.name >= other

    def __call__(self, protoVer):
        return api.PROTOCOL_MODULES[protoVer].OctetString(
            'PySNMP example command responder'
        )


class Uptime(object):
    name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
    birthday = time.time()

    def __eq__(self, other): return self.name == other

    def __ne__(self, other): return self.name != other

    def __lt__(self, other): return self.name < other

    def __le__(self, other): return self.name <= other

    def __gt__(self, other): return self.name > other

    def __ge__(self, other): return self.name >= other

    def __call__(self, protoVer):
        return api.PROTOCOL_MODULES[protoVer].TimeTicks(
            (time.time() - self.birthday) * 100
        )


mibInstr = (
    SysDescr(), Uptime()  # sorted by object name
)

mibInstrIdx = {}
for mibVar in mibInstr:
    mibInstrIdx[mibVar.name] = mibVar


def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
    while wholeMsg:

        msgVer = api.decodeMessageVersion(wholeMsg)

        if msgVer in api.PROTOCOL_MODULES:
            pMod = api.PROTOCOL_MODULES[msgVer]

        else:
            print('Unsupported SNMP version %s' % msgVer)
            return

        reqMsg, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=pMod.Message(),
        )

        rspMsg = pMod.apiMessage.getResponse(reqMsg)
        rspPDU = pMod.apiMessage.getPDU(rspMsg)
        reqPDU = pMod.apiMessage.getPDU(reqMsg)

        varBinds = []
        pendingErrors = []
        errorIndex = 0

        # GETNEXT PDU
        if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):

            # Produce response var-binds
            for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
                errorIndex = errorIndex + 1

                # Search next OID to report
                nextIdx = bisect.bisect(mibInstr, oid)
                if nextIdx == len(mibInstr):
                    # Out of MIB
                    varBinds.append((oid, val))
                    pendingErrors.append(
                        (pMod.apiPDU.setEndOfMibError, errorIndex)
                    )

                else:
                    # Report value if OID is found
                    varBinds.append(
                        (mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer))
                    )

        elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):

            for oid, val in pMod.apiPDU.getVarBinds(reqPDU):

                if oid in mibInstrIdx:
                    varBinds.append((oid, mibInstrIdx[oid](msgVer)))

                else:
                    # No such instance
                    varBinds.append((oid, val))
                    pendingErrors.append(
                        (pMod.apiPDU.setNoSuchInstanceError, errorIndex)
                    )
                    break

        else:
            # Report unsupported request type
            pMod.apiPDU.setErrorStatus(rspPDU, 'genErr')

        pMod.apiPDU.setVarBinds(rspPDU, varBinds)

        # Commit possible error indices to response PDU
        for f, i in pendingErrors:
            f(rspPDU, i)

        transportDispatcher.sendMessage(
            encoder.encode(rspMsg), transportDomain, transportAddress
        )

    return wholeMsg


transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)

# UDP/IPv4
transportDispatcher.registerTransport(
    udp.DOMAIN_NAME, udp.UdpSocketTransport().openServerMode(('localhost', 161))
)

# UDP/IPv6
transportDispatcher.registerTransport(
    udp6.DOMAIN_NAME, udp6.Udp6SocketTransport().openServerMode(('::1', 161))
)

transportDispatcher.jobStarted(1)

try:
    # Dispatcher will never finish as job#1 never reaches zero
    transportDispatcher.runDispatcher()

except Exception:
    transportDispatcher.closeDispatcher()
    raise

Download script.

See also: library reference.