summaryrefslogtreecommitdiff
path: root/dpip/dpip.c
diff options
context:
space:
mode:
Diffstat (limited to 'dpip/dpip.c')
-rw-r--r--dpip/dpip.c86
1 files changed, 66 insertions, 20 deletions
diff --git a/dpip/dpip.c b/dpip/dpip.c
index 5228f8de..b7332e72 100644
--- a/dpip/dpip.c
+++ b/dpip/dpip.c
@@ -22,6 +22,9 @@
#include "dpip.h"
#include "d_size.h"
+//#define RBUF_SZ 16384
+#define RBUF_SZ 1
+
#define DPIP_TAG_END " '>"
#define DPIP_MODE_SWITCH_TAG "cmd='start_send_page' "
#define MSG_ERR(...) fprintf(stderr, "[dpip]: " __VA_ARGS__)
@@ -292,27 +295,24 @@ int a_Dpip_dsh_write_str(Dsh *dsh, int flush, const char *str)
}
/*
- * Read new data from the socket into our buffer.
- * Used by both blocking and non-blocking IO.
+ * Read raw data from the socket into our buffer without blocking.
*/
-static void Dpip_dsh_read(Dsh *dsh)
+static void Dpip_dsh_read_nb(Dsh *dsh)
{
-//#define LBUF_SZ 16384
-#define LBUF_SZ 1
-
ssize_t st;
int old_flags, blocking;
- char buf[LBUF_SZ];
+ char buf[RBUF_SZ];
blocking = !(dsh->mode & DPIP_NONBLOCK);
if (blocking) {
+ /* set NONBLOCKING temporarily... */
old_flags = fcntl(dsh->fd_in, F_GETFL);
+ fcntl(dsh->fd_in, F_SETFL, O_NONBLOCK | old_flags);
}
while (1) {
- /* can't use fread() */
do
- st = read(dsh->fd_in, buf, LBUF_SZ);
+ st = read(dsh->fd_in, buf, RBUF_SZ);
while (st < 0 && errno == EINTR);
if (st < 0) {
@@ -330,10 +330,6 @@ static void Dpip_dsh_read(Dsh *dsh)
} else {
/* append to buf */
dStr_append_l(dsh->rd_dbuf, buf, st);
- if (blocking) {
- /* set NONBLOCKING temporarily... */
- fcntl(dsh->fd_in, F_SETFL, O_NONBLOCK | old_flags);
- }
}
}
@@ -344,23 +340,73 @@ static void Dpip_dsh_read(Dsh *dsh)
}
/*
+ * Read raw data from the socket into our buffer in BLOCKING mode.
+ */
+static void Dpip_dsh_read(Dsh *dsh)
+{
+
+ ssize_t st;
+ int old_flags, non_blocking;
+ char buf[RBUF_SZ];
+
+ non_blocking = (dsh->mode & DPIP_NONBLOCK);
+ if (non_blocking) {
+ /* set blocking mode temporarily */
+ old_flags = fcntl(dsh->fd_in, F_GETFL);
+ fcntl(dsh->fd_in, F_SETFL, old_flags);
+ }
+
+ while (1) {
+ do
+ st = read(dsh->fd_in, buf, RBUF_SZ);
+ while (st < 0 && errno == EINTR);
+
+ if (st < 0) {
+ MSG_ERR("[Dpip_dsh_read] %s\n", dStrerror(errno));
+ dsh->status = DPIP_ERROR;
+ break;
+ } else if (st == 0) {
+ dsh->status = DPIP_EOF;
+ break;
+ } else {
+ /* append to buf */
+ dStr_append_l(dsh->rd_dbuf, buf, st);
+ break;
+ }
+ }
+
+ if (non_blocking) {
+ /* restore non blocking mode */
+ fcntl(dsh->fd_in, F_SETFL, old_flags);
+ }
+
+ /* assert there's no more data in the wire... */
+ Dpip_dsh_read_nb(dsh);
+}
+
+/*
* Return a newlly allocated string with the next dpip token in the socket.
* Return value: token string on success, NULL otherwise
*/
-char *a_Dpip_dsh_read_token(Dsh *dsh)
+char *a_Dpip_dsh_read_token(Dsh *dsh, int blocking)
{
char *p, *ret = NULL;
+ /* Read all available data without blocking */
+ Dpip_dsh_read_nb(dsh);
+
/* switch mode upon request */
if (dsh->mode & DPIP_LAST_TAG)
dsh->mode = DPIP_RAW;
- /* Only read from socket when there's no data in buffer or
- * the tag is incomplete */
- if (dsh->rd_dbuf->len == 0 ||
- (dsh->mode & DPIP_TAG &&
- !(p = strstr(dsh->rd_dbuf->str, DPIP_TAG_END)))) {
- Dpip_dsh_read(dsh);
+ if (blocking && dsh->mode & DPIP_TAG) {
+ /* Only wait for data when the tag is incomplete */
+ if (!strstr(dsh->rd_dbuf->str, DPIP_TAG_END)) {
+ do {
+ Dpip_dsh_read(dsh);
+ p = strstr(dsh->rd_dbuf->str, DPIP_TAG_END);
+ } while (!p && dsh->status == EAGAIN);
+ }
}
if (dsh->mode & DPIP_TAG) {