int pos = cur_idx + length;
if (pos >= 1024) pos -= 1024;
int max_count = std::min(1024 - pos, 1024 - length);
- ssize_t r = read(fd, buf + cur_idx, max_count);
+ ssize_t r = read(fd, buf + pos, max_count);
if (r >= 0) {
length += r;
}
return stop ? "stop" : "start";
}
+// Wait for a reply packet, skipping over any information packets
+// that are received in the meantime.
+static void wait_for_reply(CPBuffer &rbuffer, int fd)
+{
+ fillBufferTo(&rbuffer, fd, 1);
+
+ while (rbuffer[0] >= 100) {
+ // Information packet; discard.
+ fillBufferTo(&rbuffer, fd, 1);
+ int pktlen = (unsigned char) rbuffer[1];
+ fillBufferTo(&rbuffer, fd, pktlen);
+ rbuffer.consume(pktlen);
+ }
+}
+
+
// Write *all* the requested buffer and re-try if necessary until
// the buffer is written or an unrecoverable error occurs.
static int write_all(int fd, const void *buf, size_t count)
try {
CPBuffer rbuffer;
- fillBufferTo(&rbuffer, socknum, 1);
+ wait_for_reply(rbuffer, socknum);
ServiceState state;
ServiceState target_state;
buf[0] = command;
buf[1] = 0; // don't pin
memcpy(buf + 2, &handle, sizeof(handle));
- r = write(socknum, buf, 2 + sizeof(handle));
+ r = write_all(socknum, buf, 2 + sizeof(handle));
delete buf;
+
+ if (r == -1) {
+ perror("write");
+ return 1;
+ }
+
+ wait_for_reply(rbuffer, socknum);
+ if (rbuffer[0] != DINIT_RP_ACK) {
+ cerr << "Protocol error." << endl;
+ return 1;
+ }
+ rbuffer.consume(1);
}
if (state == wanted_state) {
}
}
}
+
+ rbuffer.consume(pktlen);
+ r = rbuffer.fillTo(socknum, 2);
}
else {
// Not an information packet?