Question

I am attempting to capture ssh bruteforce attempts using a simple ssh server, the code is bellow and works, I am however unable to match Username/Password combinations with their originating IP address.

This is as simple as I can get it thus far, it does not log yet just prints to stdout. I used the buildProtocol definition to print out the IP address of a new connection when it is made. I would however like to get the IP address along with the username and password credentials so that I can track multiple ssh bruteforce attempts from different clients at the same time. As it stands I can only get the IP address as the connection is made, which makes tracking multiple connections concurrently impossible.

Just to clarify, when I refer to connection or connections I am not referring to a successful login, this is impossible with my code and is not intended. I am referring to a single connection to the ssh server which allows 3 password attempts before you have to reconnect.

from zope.interface import implements
from twisted.conch.unix import UnixSSHRealm
from twisted.cred import portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh import factory, userauth, keys, session
from twisted.internet import reactor, defer

with open('id_rsa') as privateBlobFile:
    privateKey = privateBlobFile.read()

with open('id_rsa.pub') as publicBlobFile:
    publicKey = publicBlobFile.read()

class FailDB:
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker)

    def requestAvatarId(self, credentials):
        print"%s:%s" % ( credentials.username, credentials.password)
        return defer.fail(UnauthorizedLogin("invalid password"))

class UnixSSHdFactory(factory.SSHFactory):
    publicKeys = {
        'ssh-rsa': keys.Key.fromString(data=publicKey)
    }
    privateKeys = {
        'ssh-rsa': keys.Key.fromString(data=privateKey)
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer
    }

    def buildProtocol(self, addr):
        print addr
        return factory.SSHFactory.buildProtocol(self, addr)

if __name__ == '__main__':
    portal = portal.Portal(UnixSSHRealm())
    portal.registerChecker(FailDB())
    UnixSSHdFactory.portal = portal
    reactor.listenTCP(2022, UnixSSHdFactory())
    reactor.run()

Example output:

IPv4Address(TCP, '127.0.0.1', 42141)
root:password123
root:123456
root:letmein

No correct solution

OTHER TIPS

You can get the address from a failed login attempt by deriving from SSHUserAuthServer and overriding the _ebBadAuth method. Example:

class AuthServer(userauth.SSHUserAuthServer):
    def _ebBadAuth(self, reason):
        addr = self.transport.getPeer().address.host
        print("addr {} failed to log in as {} using {}"
              .format(addr, self.user, self.method))
        userauth.SSHUserAuthServer._ebBadAuth(self, reason)

In case you also wanted to allow connections to succeed, you would have to not override the SSHFactory.services, but only update it with your derived class, like factory.services.update({'ssh-userauth': AuthServer}).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top