diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..33e8a849 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -1,3 +1,9 @@ +cdef enum ProtocolType: + SIMPLE = 0 # User Protocol doesn't support asyncio.BufferedProtocol + BUFFERED = 1 # User Protocol supports asyncio.BufferedProtocol + SSL_PROTOCOL = 2 # Our own SSLProtocol + + cdef class UVStream(UVBaseTransport): cdef: uv.uv_shutdown_t _shutdown_req @@ -5,7 +11,7 @@ cdef class UVStream(UVBaseTransport): bint __reading bint __read_error_close - bint __buffered + ProtocolType __protocol_type object _protocol_get_buffer object _protocol_buffer_updated @@ -16,6 +22,8 @@ cdef class UVStream(UVBaseTransport): Py_buffer _read_pybuf bint _read_pybuf_acquired + cpdef write(self, object buf) + # All "inline" methods are final cdef inline _init(self, Loop loop, object protocol, Server server, diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index c585360f..da65cad9 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -1,4 +1,4 @@ -cdef enum: +cdef enum: __PREALLOCED_BUFS = 4 @@ -213,14 +213,15 @@ cdef class UVStream(UVBaseTransport): self.__shutting_down = 0 self.__reading = 0 self.__read_error_close = 0 - self.__buffered = 0 - self._eof = 0 - self._buffer = [] - self._buffer_size = 0 + self.__protocol_type = ProtocolType.SIMPLE self._protocol_get_buffer = None self._protocol_buffer_updated = None + self._eof = 0 + self._buffer = [] + self._buffer_size = 0 + self._read_pybuf_acquired = False cdef _set_protocol(self, object protocol): @@ -229,22 +230,24 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) - if (hasattr(protocol, 'get_buffer') and + if isinstance(protocol, SSLProtocol): + self.__protocol_type = ProtocolType.SSL_PROTOCOL + elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): try: self._protocol_get_buffer = protocol.get_buffer self._protocol_buffer_updated = protocol.buffer_updated - self.__buffered = 1 + self.__protocol_type = ProtocolType.BUFFERED except AttributeError: pass else: - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef _clear_protocol(self): UVBaseTransport._clear_protocol(self) self._protocol_get_buffer = None self._protocol_buffer_updated = None - self.__buffered = 0 + self.__protocol_type = ProtocolType.SIMPLE cdef inline _shutdown(self): cdef int err @@ -294,14 +297,14 @@ cdef class UVStream(UVBaseTransport): if self.__reading: return - if self.__buffered: - err = uv.uv_read_start(self._handle, - __uv_stream_buffered_alloc, - __uv_stream_buffered_on_read) - else: + if self.__protocol_type == ProtocolType.SIMPLE: err = uv.uv_read_start(self._handle, __loop_alloc_buffer, __uv_stream_on_read) + else: + err = uv.uv_read_start( self._handle, + __uv_stream_buffered_alloc, + __uv_stream_buffered_on_read) if err < 0: exc = convert_error(err) self._fatal_error(exc, True) @@ -671,7 +674,7 @@ cdef class UVStream(UVBaseTransport): self.__reading, id(self)) - def write(self, object buf): + cpdef write(self, object buf): self._ensure_alive() if self._eof: @@ -921,9 +924,24 @@ cdef void __uv_stream_buffered_alloc( "UVStream alloc buffer callback") == 0: return + cdef UVStream sc = stream.data + + # Fast pass for our own SSLProtocol + # avoid python calls, memoryviews, context enter/exit, etc + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + try: + (sc._protocol).get_buffer_impl( + suggested_size, &uvbuf.base, &uvbuf.len) + return + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + cdef: - UVStream sc = stream.data - Loop loop = sc._loop Py_buffer* pybuf = &sc._read_pybuf int got_buf = 0 @@ -984,7 +1002,12 @@ cdef void __uv_stream_buffered_on_read( return try: - if nread > 0 and not sc._read_pybuf_acquired: + # When our own SSLProtocol is used, we get buffer pointer directly, + # through SSLProtocol.get_buffer_impl, not through Py_Buffer interface. + # Therefore sc._read_pybuf_acquired is always False for SSLProtocol. + if (nread > 0 and + sc.__protocol_type == ProtocolType.BUFFERED and + not sc._read_pybuf_acquired): # From libuv docs: # nread is > 0 if there is data available or < 0 on error. When # we’ve reached EOF, nread will be set to UV_EOF. When @@ -1005,12 +1028,20 @@ cdef void __uv_stream_buffered_on_read( if UVLOOP_DEBUG: loop._debug_stream_read_cb_total += 1 - run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + if sc.__protocol_type == ProtocolType.SSL_PROTOCOL: + Context_Enter(sc.context) + try: + (sc._protocol).buffer_updated_impl(nread) + finally: + Context_Exit(sc.context) + else: + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) except BaseException as exc: if UVLOOP_DEBUG: loop._debug_stream_read_cb_errors_total += 1 sc._fatal_error(exc, False) finally: - sc._read_pybuf_acquired = 0 - PyBuffer_Release(pybuf) + if sc._read_pybuf_acquired: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 3da10f00..cf3205d1 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -59,7 +59,6 @@ cdef class SSLProtocol: object _outgoing_read char* _ssl_buffer size_t _ssl_buffer_len - object _ssl_buffer_view SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -84,55 +83,61 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle - cdef _set_app_protocol(self, app_protocol) - cdef _wakeup_waiter(self, exc=*) - cdef _get_extra_info(self, name, default=*) - cdef _set_state(self, SSLProtocolState new_state) + # Instead of doing python calls, c methods *_impl are called directly + # from stream.pyx + + cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size) + cdef inline buffer_updated_impl(self, size_t nbytes) + + cdef inline _set_app_protocol(self, app_protocol) + cdef inline _wakeup_waiter(self, exc=*) + cdef inline _get_extra_info(self, name, default=*) + cdef inline _set_state(self, SSLProtocolState new_state) # Handshake flow - cdef _start_handshake(self) - cdef _check_handshake_timeout(self) - cdef _do_handshake(self) - cdef _on_handshake_complete(self, handshake_exc) + cdef inline _start_handshake(self) + cdef inline _check_handshake_timeout(self) + cdef inline _do_handshake(self) + cdef inline _on_handshake_complete(self, handshake_exc) # Shutdown flow - cdef _start_shutdown(self, object context=*) - cdef _check_shutdown_timeout(self) - cdef _do_read_into_void(self, object context) - cdef _do_flush(self, object context=*) - cdef _do_shutdown(self, object context=*) - cdef _on_shutdown_complete(self, shutdown_exc) - cdef _abort(self, exc) + cdef inline _start_shutdown(self, object context=*) + cdef inline _check_shutdown_timeout(self) + cdef inline _do_read_into_void(self, object context) + cdef inline _do_flush(self, object context=*) + cdef inline _do_shutdown(self, object context=*) + cdef inline _on_shutdown_complete(self, shutdown_exc) + cdef inline _abort(self, exc) # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) - cdef _do_write(self) - cdef _process_outgoing(self) + cdef inline _write_appdata(self, list_of_data, object context) + cdef inline _do_write(self) + cdef inline _process_outgoing(self) # Incoming flow - cdef _do_read(self) - cdef _do_read__buffered(self) - cdef _do_read__copied(self) - cdef _call_eof_received(self, object context=*) + cdef inline _do_read(self) + cdef inline _do_read__buffered(self) + cdef inline _do_read__copied(self) + cdef inline _call_eof_received(self, object context=*) # Flow control for writes from APP socket - cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) - cdef _set_write_buffer_limits(self, high=*, low=*) + cdef inline _control_app_writing(self, object context=*) + cdef inline size_t _get_write_buffer_size(self) + cdef inline _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket - cdef _pause_reading(self) - cdef _resume_reading(self, object context) + cdef inline _pause_reading(self) + cdef inline _resume_reading(self, object context) # Flow control for reads from SSL socket - cdef _control_ssl_reading(self) - cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) - cdef _fatal_error(self, exc, message=*) + cdef inline _control_ssl_reading(self) + cdef inline _set_read_buffer_limits(self, high=*, low=*) + cdef inline size_t _get_read_buffer_size(self) + cdef inline _fatal_error(self, exc, message=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 42bb7644..5442b5d0 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -204,11 +204,8 @@ cdef class SSLProtocol: self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) if not self._ssl_buffer: raise MemoryError() - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE) def __dealloc__(self): - self._ssl_buffer_view = None PyMem_RawFree(self._ssl_buffer) self._ssl_buffer = NULL self._ssl_buffer_len = 0 @@ -358,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - def get_buffer(self, n): + cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -367,11 +364,11 @@ cdef class SSLProtocol: if not self._ssl_buffer: raise MemoryError() self._ssl_buffer_len = want - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, want, PyBUF_WRITE) - return self._ssl_buffer_view - def buffer_updated(self, nbytes): + buf[0] = self._ssl_buffer + buf_size[0] = self._ssl_buffer_len + + cdef buffer_updated_impl(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) @@ -387,6 +384,18 @@ cdef class SSLProtocol: elif self._state == SHUTDOWN: self._do_shutdown() + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + cdef: + char* buf + size_t buf_size + + self.get_buffer_impl(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): + self.buffer_updated_impl(nbytes) + def eof_received(self): """Called when the other end of the low-level stream is half-closed. @@ -696,7 +705,10 @@ cdef class SSLProtocol: if not self._ssl_writing_paused: data = self._outgoing_read() if len(data): - self._transport.write(data) + if isinstance(self._transport, UVStream): + (self._transport).write(data) + else: + self._transport.write(data) # Incoming flow