diff --git a/examples/example_banner.py b/examples/example_banner.py new file mode 100644 index 00000000..76f16343 --- /dev/null +++ b/examples/example_banner.py @@ -0,0 +1,23 @@ +from __future__ import print_function +import os +import socket + +from ssh2.session import Session +from ssh2.utils import version + +# Connection settings +host = 'localhost' +user = os.getlogin() + +# Make socket, connect +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect((host, 22)) + +# Initialise +session = Session() +session.banner_set("SSH-2.0-Python3") +session.handshake(sock) + +print(f"Remote Banner: {session.banner_get()}") + +session.disconnect() diff --git a/ssh2/session.pyx b/ssh2/session.pyx index 13f9c54d..e9b7dfce 100644 --- a/ssh2/session.pyx +++ b/ssh2/session.pyx @@ -106,6 +106,32 @@ cdef class Session: rc = c_ssh2.libssh2_session_startup(self._session, _sock) return handle_error_codes(rc) + def banner_set(self, banner not None): + """Set the session SSH protocol banner for the local client + :param banner: banner to set for the local client + :type banner: str + + :rtype: int""" + cdef bytes b_banner = to_bytes(banner) + cdef char *_banner = b_banner + cdef int rc + with nogil: + rc = c_ssh2.libssh2_session_banner_set(self._session, _banner) + return handle_error_codes(rc) + + def banner_get(self): + """Get the session SSH protocol banner for the remote server + + :rtype: str or None""" + cdef char *_banner + cdef str banner + with nogil: + _banner = c_ssh2.libssh2_session_banner_get(self._session) + if _banner is NULL: + return + return to_str(_banner) + + def set_blocking(self, bint blocking): """Set session blocking mode on/off. diff --git a/tests/test_session.py b/tests/test_session.py index b47893cc..a148c5ae 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -8,7 +8,7 @@ from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import (AuthenticationError, AgentAuthenticationError, SCPProtocolError, RequestDeniedError, InvalidRequestError, SocketSendError, FileError, - PublickeyUnverifiedError) + PublickeyUnverifiedError, SSH2Error) from ssh2.utils import wait_socket @@ -59,6 +59,22 @@ def test_session_get_set(self): self.session.set_blocking(1) self.assertEqual(self.session.get_blocking(), 1) + def test_invalid_banner_set(self): + """ + Reset session as base_test.py calls handshake + """ + self.session.disconnect() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + + self.sock = sock + self.session = Session() + + self.session.banner_set("Python3") + + self.assertRaises(SSH2Error, self.session.handshake, self.sock) + def test_failed_agent_auth(self): self.assertRaises(AgentAuthenticationError, self.session.agent_auth, 'FAKE USER')