--- a/src/zeroconf/_protocol/incoming.py
+++ b/src/zeroconf/_protocol/incoming.py
@@ -254,12 +254,27 @@
         """Reads a character string from the packet"""
         length = self.view[self.offset]
         self.offset += 1
+        # Python slicing silently truncates when indices exceed the buffer,
+        # but self.offset still advances by the declared length below; without
+        # this check a record with an inflated character-string length would
+        # land in the cache carrying a payload shorter than the wire claimed
+        # and leave the parser pointed past _data_len for the next record.
+        if self.offset + length > self._data_len:
+            raise IncomingDecodeError(
+                f"Character string length {length} at offset {self.offset} overruns "
+                f"packet of {self._data_len} bytes from {self.source}"
+            )
         info = self.data[self.offset : self.offset + length].decode("utf-8", "replace")
         self.offset += length
         return info
 
     def _read_string(self, length: _int) -> bytes:
         """Reads a string of a given length from the packet"""
+        if self.offset + length > self._data_len:
+            raise IncomingDecodeError(
+                f"String length {length} at offset {self.offset} overruns "
+                f"packet of {self._data_len} bytes from {self.source}"
+            )
         info = self.data[self.offset : self.offset + length]
         self.offset += length
         return info
@@ -297,6 +312,19 @@
                     self.data,
                     exc_info=True,
                 )
+            if rec is not None and self.offset != end:
+                # The decoded record consumed a different number of bytes than
+                # rdlength advertised. The record is built from a slice that
+                # straddles its rdata boundary, so drop it and resync to the
+                # declared end so the next record header lands aligned.
+                log.debug(
+                    "Record for %s with type %s did not consume exactly rdlength=%d; dropping",
+                    domain,
+                    _TYPES.get(type_, type_),
+                    length,
+                )
+                self.offset = end
+                rec = None
             if rec is not None:
                 self._answers.append(rec)

--- a/tests/test_protocol.py
+++ b/tests/test_protocol.py
@@ -807,6 +807,89 @@
     assert nsec_record.next_name == "MyHome54 (2)._meshcop._udp.local."
 
 
+def test_txt_rdlength_overruns_packet_rejected():
+    """A TXT record with rdlength past the buffer must not enter the cache.
+
+    Python slicing silently truncates when the slice end exceeds the buffer,
+    so without a bounds check in ``_read_string`` a malformed wire frame
+    would land in the cache carrying a payload shorter than the rdlength
+    declared, leaving the parser desynchronized for downstream records.
+    """
+    packet = (
+        b"\x00\x00\x84\x00\x00\x00\x00\x01\x00\x00\x00\x00"
+        b"\x04test\x05local\x00"
+        b"\x00\x10\x80\x01"
+        b"\x00\x00\x11\x94"
+        b"\xff\xff"
+        b"\x05hello"
+    )
+    parsed = r.DNSIncoming(packet)
+    assert parsed.valid
+    assert parsed.answers() == []
+
+
+def test_hinfo_character_string_length_overruns_record_rejected():
+    """A HINFO character string declaring more bytes than remain must be rejected."""
+    packet = (
+        b"\x00\x00\x84\x00\x00\x00\x00\x01\x00\x00\x00\x00"
+        b"\x04test\x05local\x00"
+        b"\x00\x0d\x80\x01"
+        b"\x00\x00\x11\x94"
+        b"\x00\x07"
+        b"\x03cpu"
+        b"\xff\xff\xff"
+    )
+    parsed = r.DNSIncoming(packet)
+    assert parsed.valid
+    assert not any(isinstance(a, r.DNSHinfo) for a in parsed.answers())
+
+
+def test_a_record_rdlength_overruns_packet_rejected():
+    """An A record whose 4-byte address would walk past the buffer must be rejected."""
+    packet = (
+        b"\x00\x00\x84\x00\x00\x00\x00\x01\x00\x00\x00\x00"
+        b"\x04test\x05local\x00"
+        b"\x00\x01\x80\x01"
+        b"\x00\x00\x11\x94"
+        b"\x00\x04"
+        b"\xc0\xa8"
+    )
+    parsed = r.DNSIncoming(packet)
+    assert parsed.valid
+    assert not any(isinstance(a, r.DNSAddress) for a in parsed.answers())
+
+
+def test_record_consuming_more_than_rdlength_dropped_and_resyncs():
+    """A record whose decoded fields overrun its rdlength must drop and resync.
+
+    The first answer is a HINFO with ``rdlength=2`` and rdata ``\\x01x`` (one
+    char string ``"x"``). The second character string's length byte then comes
+    from the next record's name (``\\x00``, root domain), so the HINFO would
+    silently parse as ``cpu="x", os=""`` but leave the offset one byte past
+    the declared end, smearing the second record's framing. With the per-record
+    boundary check the bogus HINFO is dropped and the second record decodes.
+    """
+    packet = (
+        b"\x00\x00\x84\x00\x00\x00\x00\x02\x00\x00\x00\x00"
+        b"\x04test\x05local\x00"
+        b"\x00\x0d\x80\x01"
+        b"\x00\x00\x11\x94"
+        b"\x00\x02"
+        b"\x01x"
+        b"\x00"
+        b"\x00\x0c\x00\x01"
+        b"\x00\x00\x11\x94"
+        b"\x00\x02"
+        b"\xc0\x0c"
+    )
+    parsed = r.DNSIncoming(packet)
+    answers = parsed.answers()
+    assert not any(isinstance(a, r.DNSHinfo) for a in answers)
+    ptrs = [a for a in answers if isinstance(a, r.DNSPointer)]
+    assert len(ptrs) == 1
+    assert ptrs[0].alias == "test.local."
+
+
 def test_records_same_packet_share_fate():
     """Test records in the same packet all have the same created time."""
     out = r.DNSOutgoing(const._FLAGS_QR_QUERY | const._FLAGS_AA)
