aboutsummaryrefslogtreecommitdiff
path: root/src/IO
diff options
context:
space:
mode:
Diffstat (limited to 'src/IO')
-rw-r--r--src/IO/dpi.c162
1 files changed, 96 insertions, 66 deletions
diff --git a/src/IO/dpi.c b/src/IO/dpi.c
index 6f1c0a97..615e67bb 100644
--- a/src/IO/dpi.c
+++ b/src/IO/dpi.c
@@ -90,6 +90,7 @@ static void Dpi_close_fd(int fd)
{
int st;
+ dReturn_if (fd < 0);
do
st = close(fd);
while (st < 0 && errno == EINTR);
@@ -258,6 +259,66 @@ static void Dpi_parse_token(dpi_conn_t *conn)
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/*
+ * Write data into a file descriptor taking care of EINTR
+ * and possible data splits.
+ * Return value: 1 on success, -1 on error.
+ */
+static int Dpi_blocking_write(int fd, const char *msg, int msg_len)
+{
+ int st, sent = 0;
+
+ while (sent < msg_len) {
+ st = write(fd, msg + sent, msg_len - sent);
+ if (st < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ MSG_ERR("[Dpi_blocking_write] %s\n", dStrerror(errno));
+ break;
+ }
+ }
+ sent += st;
+ }
+
+ return (sent == msg_len) ? 1 : -1;
+}
+
+/*
+ * Read all the available data from a filedescriptor.
+ * This is intended for short answers, i.e. when we know the server
+ * will write it all before being preempted. For answers that may come
+ * as an stream with delays, non-blocking is better.
+ * Return value: read data, or NULL on error and no data.
+ */
+static char *Dpi_blocking_read(int fd)
+{
+ int st;
+ const int buf_sz = 8*1024;
+ char buf[buf_sz], *msg = NULL;
+ Dstr *dstr = dStr_sized_new(buf_sz);
+
+ do {
+ st = read(fd, buf, buf_sz);
+ if (st < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ MSG_ERR("[Dpi_blocking_read] %s\n", dStrerror(errno));
+ break;
+ }
+ } else if (st > 0) {
+ dStr_append_l(dstr, buf, st);
+ }
+ } while (st == buf_sz);
+
+ msg = (dstr->len > 0) ? dstr->str : NULL;
+ dStr_free(dstr, (dstr->len > 0) ? FALSE : TRUE);
+ return msg;
+}
+
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
+
+/*
* Get a new data buffer (within a 'dbuf'), save it into local data,
* split in tokens and parse the contents.
*/
@@ -288,8 +349,8 @@ static void Dpi_process_dbuf(int Op, void *Data1, dpi_conn_t *conn)
static int Dpi_start_dpid(void)
{
pid_t pid;
- int st_pipe[2], n, ret = 1;
- char buf[16];
+ int st_pipe[2], ret = 1;
+ char *answer;
/* create a pipe to track our child's status */
if (pipe(st_pipe))
@@ -304,9 +365,9 @@ static int Dpi_start_dpid(void)
dFree(path1);
if (execlp("dpid", "dpid", (char*)NULL) == -1) {
MSG("Dpi_start_dpid (child): %s\n", dStrerror(errno));
- do
- n = write(st_pipe[1], "ERROR", 5);
- while (n == -1 && errno == EINTR);
+ if (Dpi_blocking_write(st_pipe[1], "ERROR", 5) == -1) {
+ MSG("Dpi_start_dpid (child): can't write to pipe.\n");
+ }
Dpi_close_fd(st_pipe[1]);
_exit (EXIT_FAILURE);
}
@@ -321,14 +382,11 @@ static int Dpi_start_dpid(void)
} else {
/* This is the parent process, check our child status... */
Dpi_close_fd(st_pipe[1]);
- do
- n = read(st_pipe[0], buf, 16);
- while (n == -1 && errno == EINTR);
- _MSG("Dpi_start_dpid: n = %d\n", n);
- if (n != 5) {
- ret = 0;
+ if ((answer = Dpi_blocking_read(st_pipe[0])) != NULL) {
+ MSG("Dpi_start_dpid: can't start dpid\n");
+ dFree(answer);
} else {
- MSG("Dpi_start_dpid: %s\n", dStrerror(errno));
+ ret = 0;
}
}
@@ -455,11 +513,10 @@ static int Dpi_blocking_start_dpid(void)
*/
int Dpi_get_server_port(const char *server_name)
{
- int sock_fd, req_sz, rdlen, dpi_port;
- int st, dpid_port, ret = -1, ok = 0;
+ int sock_fd, dpi_port;
+ int dpid_port, ret = -1, ok = 0;
struct sockaddr_in sin;
- char buf[128], *cmd, *request, *rply = NULL, *port_str;
- size_t buflen;
+ char *cmd, *request, *rply = NULL, *port_str;
socklen_t sin_sz;
dReturn_val_if_fail (server_name != NULL, ret);
@@ -471,6 +528,7 @@ int Dpi_get_server_port(const char *server_name)
}
if (ok) {
/* Connect a socket with dpid */
+ ok = 0;
sin_sz = sizeof(sin);
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
@@ -479,49 +537,37 @@ int Dpi_get_server_port(const char *server_name)
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ||
connect(sock_fd, (struct sockaddr *)&sin, sin_sz) == -1) {
MSG("Dpi_get_server_port: %s\n", dStrerror(errno));
- ok = 0;
+ } else {
+ ok = 1;
}
}
if (ok) {
/* ask dpid to check the dpi and send its port number back */
+ ok = 0;
request = a_Dpip_build_cmd("cmd=%s msg=%s", "check_server", server_name);
_MSG("[%s]\n", request);
- do {
- st = write(sock_fd, request, strlen(request));
- } while (st < 0 && errno == EINTR);
- if (st < 0 && errno != EINTR) {
+
+ if (Dpi_blocking_write(sock_fd, request, strlen(request)) == -1) {
MSG("Dpi_get_server_port: %s\n", dStrerror(errno));
- ok = 0;
+ } else {
+ ok = 1;
}
dFree(request);
shutdown(sock_fd, 1); /* signals no more writes to dpid */
}
if (ok) {
/* Get the reply */
- buf[0] = '\0';
- buflen = sizeof(buf)/sizeof(buf[0]);
- for (req_sz = 0; (rdlen = read(sock_fd, buf, buflen)) != 0;
- req_sz += rdlen) {
- if (rdlen == -1 && errno == EINTR)
- continue;
- if (rdlen == -1) {
- MSG("Dpi_get_server_port: %s\n", dStrerror(errno));
- ok = 0;
- break;
- }
- rply = dRealloc(rply, (uint_t)(req_sz + rdlen + 1));
- if (req_sz == 0)
- rply[0] = '\0';
- strncat(rply, buf, (size_t)rdlen);
- }
- Dpi_close_fd(sock_fd);
- if (rdlen == 0 && rply) {
- _MSG("rply = [%s]\n", rply);
+ ok = 0;
+ if ((rply = Dpi_blocking_read(sock_fd)) == NULL) {
+ MSG("Dpi_get_server_port: can't read server port from dpid.\n");
+ } else {
ok = 1;
}
+ Dpi_close_fd(sock_fd);
}
if (ok) {
/* Parse reply */
+ ok = 0;
cmd = a_Dpip_get_attr(rply, "cmd");
if (strcmp(cmd, "send_data") == 0) {
port_str = a_Dpip_get_attr(rply, "msg");
@@ -705,44 +751,28 @@ void a_Dpi_dillo_exit()
}
-
/*
* Send a command to a dpi server, and block until the answer is got.
* Return value: the dpip tag answer as an string, NULL on error.
*/
char *a_Dpi_send_blocking_cmd(const char *server_name, const char *cmd)
{
- int cst, SockFD;
- ssize_t st;
- char buf[16384], *retval = NULL;
+ int cst, sock_fd;
+ char *retval = NULL;
/* test the dpid, and wait a bit for it to start if necessary */
if ((cst = Dpi_blocking_start_dpid()) != 0) {
return retval;
}
- SockFD = Dpi_connect_socket(server_name, TRUE);
- if (SockFD != -1) {
- /* TODO: handle the case of (st < strlen(cmd)) */
- do
- st = write(SockFD, cmd, strlen(cmd));
- while (st == -1 && errno == EINTR);
-
- /* TODO: if the answer is too long... */
- do
- st = read(SockFD, buf, 16384);
- while (st < 0 && errno == EINTR);
-
- if (st == -1)
- perror("[a_Dpi_send_blocking_cmd]");
- else if (st > 0)
- retval = dStrndup(buf, (size_t)st);
-
- Dpi_close_fd(SockFD);
-
- } else {
- perror("[a_Dpi_send_blocking_cmd]");
+ if ((sock_fd = Dpi_connect_socket(server_name, TRUE)) == -1) {
+ MSG_ERR("[a_Dpi_send_blocking_cmd] Can't connect to server.\n");
+ } else if (Dpi_blocking_write(sock_fd, cmd, strlen(cmd)) == -1) {
+ MSG_ERR("[a_Dpi_send_blocking_cmd] Can't send message.\n");
+ } if ((retval = Dpi_blocking_read(sock_fd)) == NULL) {
+ MSG_ERR("[a_Dpi_send_blocking_cmd] Can't read message.\n");
}
+ Dpi_close_fd(sock_fd);
return retval;
}