Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@
"FakeSSLContext",
)

__version__ = "3.13.9"
__version__ = "3.13.10"
82 changes: 82 additions & 0 deletions mocket/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,76 @@
self.io.truncate()
self.io.seek(0)

def sendmsg(
self,
buffers: list[ReadableBuffer],
ancdata: list[tuple[int, bytes]] | None = None,
flags: int = 0,
address: Address | None = None,
) -> int:
if not buffers:
return 0

data = b"".join(bytes(b) for b in buffers)
self.sendall(data)
return len(data)

def recvmsg(
self,
buffersize: int | None = None,
ancbufsize: int | None = None,
flags: int = 0,
) -> tuple[bytes, list[tuple[int, bytes]]]:
"""
Receive a message from the socket.
This is a mock implementation that reads from the MocketSocketIO.
"""
try:
data = self.recv(buffersize)
except BlockingIOError:
return b"", []

# Mocking the ancillary data and flags as empty
return data, []

def recvmsg_into(
self,
buffers: list[ReadableBuffer],
ancbufsize: int | None = None,
flags: int = 0,
address: Address | None = None,
):
"""
Receive a message into multiple buffers.
This is a mock implementation that reads from the MocketSocketIO.
"""
if not buffers:
return 0

try:
data = self.recv(len(buffers[0]))
except BlockingIOError:
return 0

for i, buffer in enumerate(buffers):
if i < len(data):
buffer[: len(data)] = data
else:
buffer[:] = b""

Check warning on line 249 in mocket/socket.py

View check run for this annotation

Codecov / codecov/patch

mocket/socket.py#L249

Added line #L249 was not covered by tests
return len(data)

def recvfrom_into(
self,
buffer: WriteableBuffer,
buffersize: int | None = None,
flags: int | None = None,
):
"""
Receive data into a buffer and return the number of bytes received.
This is a mock implementation that reads from the MocketSocketIO.
"""
return self.recv_into(buffer, buffersize, flags), self._address

def recv_into(
self,
buffer: WriteableBuffer,
Expand Down Expand Up @@ -286,6 +356,18 @@
self._entry = entry
return len(data)

def accept(self) -> tuple[MocketSocket, _RetAddress]:
"""Accept a connection and return a new MocketSocket object."""
new_socket = MocketSocket(
family=self._family,
type=self._type,
proto=self._proto,
)
new_socket._address = (self._host, self._port)
new_socket._host = self._host
new_socket._port = self._port
return new_socket, (self._host, self._port)

def close(self) -> None:
if self._true_socket and not self._true_socket._closed:
self._true_socket.close()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ test = [
"mypy",
"types-decorator",
"types-requests",
"trio",
]
speedups = [
"xxhash;platform_python_implementation=='CPython'",
Expand Down
96 changes: 96 additions & 0 deletions tests/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,99 @@ def test_udp_socket():

assert data == response_data
assert address == (host, port)


def test_recvmsg():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
test_data = b"hello world"
sock._io = type("MockIO", (), {"read": lambda self, n: test_data})()
data, ancdata = sock.recvmsg(1024)
assert data == test_data
assert ancdata == []


def test_recvmsg_into():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
test_data = b"foobar"
sock._io = type("MockIO", (), {"read": lambda self, n: test_data})()
buf = bytearray(10)
buf2 = bytearray(10)
buffers = [buf, buf2]
nbytes = sock.recvmsg_into(buffers)
assert nbytes == len(test_data)
assert buf[: len(test_data)] == test_data


def test_recvmsg_into_empty_buffers():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.recvmsg_into([])
assert result == 0


def test_accept():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
sock._host = "127.0.0.1"
sock._port = 8080
new_sock, addr = sock.accept()
assert isinstance(new_sock, MocketSocket)
assert new_sock is not sock
assert addr == ("127.0.0.1", 8080)
assert new_sock._host == "127.0.0.1"
assert new_sock._port == 8080


@mocketize
def test_sendmsg():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
sock._host = "127.0.0.1"
sock._port = 8080
response_data = b"pong"

Mocket.register(MocketEntry((sock._host, sock._port), [response_data]))

msg = [b"foo", b"bar", b"foobaz"]
total_sent = sock.sendmsg(msg)
assert total_sent == sum(len(m) for m in msg)
assert Mocket.last_request() == b"".join(msg)


def test_sendmsg_empty_buffers():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.sendmsg([])
assert result == 0


def test_recvmsg_no_data():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
# Mock _io.read to return empty bytes
sock._io = type("MockIO", (), {"read": lambda self, n: b""})()
data, ancdata = sock.recvmsg(1024)
assert data == b""
assert ancdata == []


def test_recvmsg_into_no_data():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
# Mock _io.read to return empty bytes
sock._io = type("MockIO", (), {"read": lambda self, n: b""})()
buf = bytearray(10)
nbytes = sock.recvmsg_into([buf])
assert nbytes == 0
assert buf == bytearray(10)


def test_getsockopt():
# getsockopt is a static method, so we can call it directly
result = MocketSocket.getsockopt(0, 0)
assert result == socket.SOCK_STREAM


def test_recvfrom_into():
sock = MocketSocket(socket.AF_INET, socket.SOCK_STREAM)
test_data = b"abc123"
sock._io = type("MockIO", (), {"read": lambda self, n: test_data})()
buf = bytearray(10)
nbytes, addr = sock.recvfrom_into(buf)
assert nbytes == len(test_data)
assert buf[:nbytes] == test_data
assert addr == sock._address