Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ CHECK_CONST_EXISTS(KERN_ARND sys/sysctl.h EVENT__HAVE_DECL_KERN_ARND)
CHECK_CONST_EXISTS(KERN_RANDOM sys/sysctl.h EVENT__HAVE_DECL_KERN_RANDOM)
CHECK_CONST_EXISTS(RANDOM_UUID sys/sysctl.h EVENT__HAVE_DECL_RANDOM_UUID)
CHECK_SYMBOL_EXISTS(F_SETFD fcntl.h EVENT__HAVE_SETFD)
CHECK_CONST_EXISTS(SO_TIMESTAMP sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMP)
CHECK_CONST_EXISTS(SO_TIMESTAMPNS sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMPNS)

CHECK_TYPE_SIZE(fd_mask EVENT__HAVE_FD_MASK)

Expand Down
247 changes: 222 additions & 25 deletions buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,14 @@ advance_last_with_data(struct evbuffer *buf)
int
evbuffer_commit_space(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs)
{
return evbuffer_commit_space_with_timespec(buf, vec, n_vecs, NULL);
}

int
evbuffer_commit_space_with_timespec(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs,
const struct timespec *ts)
{
struct evbuffer_chain *chain, **firstchainp, **chainp;
int result = -1;
Expand All @@ -744,6 +752,10 @@ evbuffer_commit_space(struct evbuffer *buf,
goto done;
buf->last->off += vec[0].iov_len;
added = vec[0].iov_len;
if (ts && added && buf->last->timestamp.valid == 0) {
buf->last->timestamp.ts = *ts;
buf->last->timestamp.valid = 1;
}
if (added)
advance_last_with_data(buf);
goto okay;
Expand Down Expand Up @@ -773,6 +785,10 @@ evbuffer_commit_space(struct evbuffer *buf,
for (i=0; i<n_vecs; ++i) {
(*chainp)->off += vec[i].iov_len;
added += vec[i].iov_len;
if (ts && vec[i].iov_len && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = *ts;
(*chainp)->timestamp.valid = 1;
}
if (vec[i].iov_len) {
buf->last_with_datap = chainp;
}
Expand Down Expand Up @@ -940,6 +956,7 @@ APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
tmp->off = chain->off;
tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_IMMUTABLE;
tmp->buffer = chain->buffer;
tmp->timestamp = chain->timestamp;
evbuffer_chain_insert(dst, tmp);
}
}
Expand Down Expand Up @@ -1145,6 +1162,7 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
chain->timestamp.valid = 0;
break;
} else
evbuffer_chain_free(chain);
Expand Down Expand Up @@ -1386,6 +1404,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}

if (CHAIN_PINNED(chain)) {
/* Pinned chain case: expand in-place by appending data from
* subsequent chains. Timestamps from subsequent chains being
* consolidated are intentionally discarded; only this chain's
* timestamp is preserved as it contains the oldest data.
*/
size_t old_off = chain->off;
if (CHAIN_SPACE_LEN(chain) < size - chain->off) {
/* not enough room at end of chunk. */
Expand All @@ -1397,6 +1420,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
size -= old_off;
chain = chain->next;
} else if (chain->buffer_len - chain->misalign >= (size_t)size) {
/* Sufficient space case: expand in-place without reallocation
* by appending data from subsequent chains. Timestamps from
* subsequent chains being consolidated are intentionally discarded;
* only this chain's timestamp is preserved.
*/
/* already have enough space in the first chain */
size_t old_off = chain->off;
buffer = chain->buffer + chain->misalign + chain->off;
Expand All @@ -1411,16 +1439,25 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}
buffer = tmp->buffer;
tmp->off = size;
if (chain->timestamp.valid) {
tmp->timestamp = chain->timestamp;
}
buf->first = tmp;
}

/* TODO(niels): deal with buffers that point to NULL like sendfile */

/* Copy and free every chunk that will be entirely pulled into tmp */
/* Copy and free every chunk that will be entirely pulled into tmp.
* If the destination chain still has no timestamp, pick up the first
* valid one found among the chains being consumed. */
last_with_data = *buf->last_with_datap;
for (; chain != NULL && (size_t)size >= chain->off; chain = next) {
next = chain->next;

if (!tmp->timestamp.valid && chain->timestamp.valid) {
tmp->timestamp = chain->timestamp;
}

memcpy(buffer, chain->buffer + chain->misalign, chain->off);
size -= chain->off;
buffer += chain->off;
Expand All @@ -1433,6 +1470,9 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}

if (chain != NULL) {
if (!tmp->timestamp.valid && chain->timestamp.valid) {
tmp->timestamp = chain->timestamp;
}
memcpy(buffer, chain->buffer + chain->misalign, size);
chain->misalign += size;
chain->off -= size;
Expand Down Expand Up @@ -2258,8 +2298,18 @@ evbuffer_read_setup_vecs_(struct evbuffer *buf, ev_ssize_t howmuch,
}

static int
get_n_bytes_readable_on_socket(evutil_socket_t fd)
get_n_bytes_readable_on_socket(evutil_socket_t fd, int use_recvmsg)
{
#if !defined(_WIN32) && !defined(__APPLE__)
if (use_recvmsg) {
int r = (int)recv(fd, NULL, 0, MSG_PEEK | MSG_TRUNC);
if (r > 0) {
return r;
}
return EVBUFFER_MAX_READ;
}
#endif

#if defined(FIONREAD) && defined(_WIN32)
unsigned long lng = EVBUFFER_MAX_READ;
if (ioctlsocket(fd, FIONREAD, &lng) < 0)
Expand All @@ -2276,10 +2326,30 @@ get_n_bytes_readable_on_socket(evutil_socket_t fd)
#endif
}

/* TODO(niels): should this function return ev_ssize_t and take ev_ssize_t
* as howmuch? */
int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
/**
* Reads data from a socket optionally with kernel timestamp support.
*
* @param buf the evbuffer to populate
* @param fd the file descriptor to use
* @param howmuch the amount of data to read (this will be adjusted;
* see below)
* @param use_recvmsg try to use recvmsg to read the data (and try to
* read kernel timestamps)
*
* If howmuch is less than 0 or greater than EVBUFFER_MAX_READ we'll try
* to read EVBUFFER_MAX_READ bytes (there may be less available on the
*socket)
*
* If use_recvmsg is nonzero, it attempts to retrieve the SO_TIMESTAMPNS or
* SO_TIMESTAMP ancillary data from recvmsg() and associate it with the buffer
* data. Each recvmsg() call writes into a newly allocated chain, so every
* call gets its own chain and its timestamp is preserved independently.
*
* TODO(niels): should this function return ev_ssize_t and take ev_ssize_t
* as howmuch?
*/
static int
evbuffer_read_impl_(struct evbuffer *buf, evutil_socket_t fd, int howmuch, int use_recvmsg)
{
struct evbuffer_chain **chainp;
int n;
Expand All @@ -2291,6 +2361,9 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
struct evbuffer_chain *chain;
unsigned char *p;
#endif
struct timespec ts;
int ts_found = 0;
memset(&ts, 0, sizeof(ts));

EVBUFFER_LOCK(buf);

Expand All @@ -2299,33 +2372,57 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
goto done;
}

n = get_n_bytes_readable_on_socket(fd);
if (n <= 0 || n > EVBUFFER_MAX_READ)
n = get_n_bytes_readable_on_socket(fd, use_recvmsg);
if (n <= 0 || (n > EVBUFFER_MAX_READ && !use_recvmsg))
n = EVBUFFER_MAX_READ;
#if !defined(_WIN32) && !defined(__APPLE__)
if (use_recvmsg && n > howmuch) {
howmuch = n;
}
#endif
if (howmuch < 0 || howmuch > n)
howmuch = n;

#ifdef USE_IOVEC_IMPL
/* Since we can use iovecs, we're willing to use the last
* NUM_READ_IOVEC chains. */
if (evbuffer_expand_fast_(buf, howmuch, NUM_READ_IOVEC) == -1) {
result = -1;
goto done;
} else {
{
IOV_TYPE vecs[NUM_READ_IOVEC];
#ifndef _WIN32
if (use_recvmsg) {
/* Allocate a fresh chain for each recvmsg() call so that
* every call gets its own chain with an independent timestamp. */
struct evbuffer_chain *old_last = buf->last;
struct evbuffer_chain *new_chain = evbuffer_chain_new(howmuch);
if (!new_chain) {
result = -1;
goto done;
}
evbuffer_chain_insert(buf, new_chain);
chainp = old_last ? &old_last->next : &buf->first;
nvecs = 1;
vecs[0].iov_base = (void *)CHAIN_SPACE_PTR(new_chain);
vecs[0].iov_len = (size_t)howmuch;
} else
#endif
/* Since we can use iovecs, we're willing to use the last
* NUM_READ_IOVEC chains. */
if (evbuffer_expand_fast_(buf, howmuch, NUM_READ_IOVEC) == -1) {
result = -1;
goto done;
} else {
#ifdef EVBUFFER_IOVEC_IS_NATIVE_
nvecs = evbuffer_read_setup_vecs_(buf, howmuch, vecs,
NUM_READ_IOVEC, &chainp, 1);
nvecs = evbuffer_read_setup_vecs_(buf, howmuch, vecs,
NUM_READ_IOVEC, &chainp, 1);
#else
/* We aren't using the native struct iovec. Therefore,
we are on win32. */
struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
nvecs = evbuffer_read_setup_vecs_(buf, howmuch, ev_vecs, 2,
&chainp, 1);

for (i=0; i < nvecs; ++i)
WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
/* We aren't using the native struct iovec. Therefore,
we are on win32. */
struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
nvecs = evbuffer_read_setup_vecs_(buf, howmuch, ev_vecs, 2,
&chainp, 1);

for (i=0; i < nvecs; ++i)
WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif
}

#ifdef _WIN32
{
Expand All @@ -2342,7 +2439,68 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
n = bytesRead;
}
#else
n = readv(fd, vecs, nvecs);
if (use_recvmsg) {
struct msghdr msg;
/* Control message buffer for cmsg data.
* Sized to accommodate timestamp messages (SCM_TIMESTAMPNS,
* SCM_TIMESTAMP) plus additional space (256 bytes) for other
* possible cmsg entries (SCM_RIGHTS, SCM_CREDENTIALS, etc.)
* to prevent truncation via MSG_CTRUNC which would silently
* lose timestamp information. */
#define EVBUFFER_RECVMSG_CTRLFN_SZ \
(CMSG_SPACE(sizeof(struct timespec)) + \
CMSG_SPACE(sizeof(struct timeval)) + 256)
unsigned char control[EVBUFFER_RECVMSG_CTRLFN_SZ];
#undef EVBUFFER_RECVMSG_CTRLFN_SZ

/* Setup message header */
memset(&msg, 0, sizeof(msg));
msg.msg_iov = vecs;
msg.msg_iovlen = nvecs;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

/* Receive with ancillary data */
n = recvmsg(fd, &msg, 0);

if (n > 0) {
/* Check if control data was truncated */
if (!(msg.msg_flags & MSG_CTRUNC)) {
struct cmsghdr *cmsg;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET) {
continue;
}
#if EVENT__HAVE_DECL_SO_TIMESTAMPNS
if (cmsg->cmsg_type == SCM_TIMESTAMPNS) {
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timespec))) {
continue;
}
ts = *(struct timespec *)CMSG_DATA(cmsg);
ts_found = 1;
break;
}
#endif

#if EVENT__HAVE_DECL_SO_TIMESTAMP
if (cmsg->cmsg_type == SCM_TIMESTAMP) {
struct timeval *tv;
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timeval))) {
continue;
}
tv = (struct timeval *)CMSG_DATA(cmsg);
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000L;
ts_found = 1;
break;
}
#endif
}
}
}
} else {
n = readv(fd, vecs, nvecs);
}
#endif
}

Expand Down Expand Up @@ -2387,8 +2545,16 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
if ((ev_ssize_t)space < remaining) {
(*chainp)->off += space;
remaining -= (int)space;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
} else {
(*chainp)->off += remaining;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
buf->last_with_datap = chainp;
break;
}
Expand All @@ -2409,6 +2575,37 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
return result;
}

int
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
return evbuffer_read_impl_(buf, fd, howmuch, 0);
}

int
evbuffer_read_with_timestamp(
struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
return evbuffer_read_impl_(buf, fd, howmuch, 1);
}

int evbuffer_get_timestamp(
struct evbuffer *buf, struct timespec *timestamp)
{
int result = -1;
if (!timestamp) {
return -1;
}
EVBUFFER_LOCK(buf);
{
if (buf->first && buf->first->timestamp.valid) {
*timestamp = buf->first->timestamp.ts;
result = 0;
}
}
EVBUFFER_UNLOCK(buf);
return result;
}

#ifdef USE_IOVEC_IMPL
static inline int
evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
Expand Down
Loading