00001
00002
00003 #include <time.h>
00004 #include <string.h>
00005 #include <stdlib.h>
00006 #include <stdio.h>
00007 #include <ctype.h>
00008 #include <assert.h>
00009
00010 #include "abyss_mallocvar.h"
00011 #include "abyss_xmlrpc_int.h"
00012 #include <xmlrpc-c/abyss.h>
00013 #include "abyss_socket.h"
00014 #include "abyss_server.h"
00015 #include "abyss_thread.h"
00016
00017 #include "abyss_conn.h"
00018
00019
00020
00021
00022
00023 static TThreadProc connJob;
00024
00025 static void
00026 connJob(void * const userHandle) {
00027
00028
00029
00030
00031 TConn * const connectionP = userHandle;
00032
00033 (connectionP->job)(connectionP);
00034
00035 connectionP->finished = TRUE;
00036
00037
00038
00039
00040
00041
00042
00043 ThreadExit(0);
00044 }
00045
00046
00047
00048 static void
00049 connDone(TConn * const connectionP) {
00050
00051
00052
00053
00054 connectionP->finished = TRUE;
00055
00056 if (connectionP->done)
00057 connectionP->done(connectionP);
00058 }
00059
00060
00061
00062 static TThreadDoneFn threadDone;
00063
00064 static void
00065 threadDone(void * const userHandle) {
00066
00067 TConn * const connectionP = userHandle;
00068
00069 connDone(connectionP);
00070 }
00071
00072
00073
00074 static void
00075 makeThread(TConn * const connectionP,
00076 enum abyss_foreback const foregroundBackground,
00077 abyss_bool const useSigchld,
00078 const char ** const errorP) {
00079
00080 switch (foregroundBackground) {
00081 case ABYSS_FOREGROUND:
00082 connectionP->hasOwnThread = FALSE;
00083 *errorP = NULL;
00084 break;
00085 case ABYSS_BACKGROUND: {
00086 const char * error;
00087 connectionP->hasOwnThread = TRUE;
00088 ThreadCreate(&connectionP->threadP, connectionP,
00089 &connJob, &threadDone, useSigchld,
00090 &error);
00091 if (error) {
00092 xmlrpc_asprintf(errorP, "Unable to create thread to "
00093 "process connection. %s", error);
00094 xmlrpc_strfree(error);
00095 } else
00096 *errorP = NULL;
00097 } break;
00098 }
00099 }
00100
00101
00102
00103 void
00104 ConnCreate(TConn ** const connectionPP,
00105 TServer * const serverP,
00106 TSocket * const connectedSocketP,
00107 TThreadProc * const job,
00108 TThreadDoneFn * const done,
00109 enum abyss_foreback const foregroundBackground,
00110 abyss_bool const useSigchld,
00111 const char ** const errorP) {
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 TConn * connectionP;
00136
00137 MALLOCVAR(connectionP);
00138
00139 if (connectionP == NULL)
00140 xmlrpc_asprintf(errorP, "Unable to allocate memory for a connection "
00141 "descriptor.");
00142 else {
00143 abyss_bool success;
00144 uint16_t peerPortNumber;
00145
00146 connectionP->server = serverP;
00147 connectionP->socketP = connectedSocketP;
00148 connectionP->buffersize = 0;
00149 connectionP->bufferpos = 0;
00150 connectionP->finished = FALSE;
00151 connectionP->job = job;
00152 connectionP->done = done;
00153 connectionP->inbytes = 0;
00154 connectionP->outbytes = 0;
00155 connectionP->trace = getenv("ABYSS_TRACE_CONN");
00156
00157 SocketGetPeerName(connectedSocketP,
00158 &connectionP->peerip, &peerPortNumber, &success);
00159
00160 if (success)
00161 makeThread(connectionP, foregroundBackground, useSigchld, errorP);
00162 else
00163 xmlrpc_asprintf(errorP, "Failed to get peer name from socket.");
00164 }
00165 *connectionPP = connectionP;
00166 }
00167
00168
00169
00170 abyss_bool
00171 ConnProcess(TConn * const connectionP) {
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182 abyss_bool retval;
00183
00184 if (connectionP->hasOwnThread) {
00185
00186
00187
00188 retval = ThreadRun(connectionP->threadP);
00189 } else {
00190
00191 (connectionP->job)(connectionP);
00192 connDone(connectionP);
00193 retval = TRUE;
00194 }
00195 return retval;
00196 }
00197
00198
00199
00200 void
00201 ConnWaitAndRelease(TConn * const connectionP) {
00202 if (connectionP->hasOwnThread)
00203 ThreadWaitAndRelease(connectionP->threadP);
00204
00205 free(connectionP);
00206 }
00207
00208
00209
00210 abyss_bool
00211 ConnKill(TConn * connectionP) {
00212 connectionP->finished = TRUE;
00213 return ThreadKill(connectionP->threadP);
00214 }
00215
00216
00217
00218 void
00219 ConnReadInit(TConn * const connectionP) {
00220 if (connectionP->buffersize>connectionP->bufferpos) {
00221 connectionP->buffersize -= connectionP->bufferpos;
00222 memmove(connectionP->buffer,
00223 connectionP->buffer+connectionP->bufferpos,
00224 connectionP->buffersize);
00225 connectionP->bufferpos = 0;
00226 } else
00227 connectionP->buffersize=connectionP->bufferpos = 0;
00228
00229 connectionP->inbytes=connectionP->outbytes = 0;
00230 }
00231
00232
00233
00234 static void
00235 traceBuffer(const char * const label,
00236 const char * const buffer,
00237 unsigned int const size) {
00238
00239 unsigned int nonPrintableCount;
00240 unsigned int i;
00241
00242 nonPrintableCount = 0;
00243
00244 for (i = 0; i < size; ++i) {
00245 if (!isprint(buffer[i]) && buffer[i] != '\n' && buffer[i] != '\r')
00246 ++nonPrintableCount;
00247 }
00248 if (nonPrintableCount > 0)
00249 fprintf(stderr, "%s contains %u nonprintable characters.\n",
00250 label, nonPrintableCount);
00251
00252 fprintf(stderr, "%s:\n", label);
00253 fprintf(stderr, "%.*s\n", (int)size, buffer);
00254 }
00255
00256
00257
00258 static void
00259 traceSocketRead(TConn * const connectionP,
00260 unsigned int const size) {
00261
00262 if (connectionP->trace)
00263 traceBuffer("READ FROM SOCKET:",
00264 connectionP->buffer + connectionP->buffersize, size);
00265 }
00266
00267
00268
00269 static void
00270 traceSocketWrite(TConn * const connectionP,
00271 const char * const buffer,
00272 unsigned int const size,
00273 abyss_bool const failed) {
00274
00275 if (connectionP->trace) {
00276 const char * const label =
00277 failed ? "FAILED TO WRITE TO SOCKET:" : "WROTE TO SOCKET";
00278 traceBuffer(label, buffer, size);
00279 }
00280 }
00281
00282
00283
00284 static uint32_t
00285 bufferSpace(TConn * const connectionP) {
00286
00287 return BUFFER_SIZE - connectionP->buffersize;
00288 }
00289
00290
00291
00292 abyss_bool
00293 ConnRead(TConn * const connectionP,
00294 uint32_t const timeout) {
00295
00296
00297
00298
00299
00300
00301 time_t const deadline = time(NULL) + timeout;
00302
00303 abyss_bool cantGetData;
00304 abyss_bool gotData;
00305
00306 cantGetData = FALSE;
00307 gotData = FALSE;
00308
00309 while (!gotData && !cantGetData) {
00310 int const timeLeft = deadline - time(NULL);
00311
00312 if (timeLeft <= 0)
00313 cantGetData = TRUE;
00314 else {
00315 int rc;
00316
00317 rc = SocketWait(connectionP->socketP, TRUE, FALSE,
00318 timeLeft * 1000);
00319
00320 if (rc != 1)
00321 cantGetData = TRUE;
00322 else {
00323 uint32_t bytesAvail;
00324
00325 bytesAvail = SocketAvailableReadBytes(connectionP->socketP);
00326
00327 if (bytesAvail <= 0)
00328 cantGetData = TRUE;
00329 else {
00330 uint32_t const bytesToRead =
00331 MIN(bytesAvail, bufferSpace(connectionP)-1);
00332
00333 uint32_t bytesRead;
00334
00335 bytesRead = SocketRead(
00336 connectionP->socketP,
00337 (unsigned char*)connectionP->buffer + connectionP->buffersize,
00338 bytesToRead);
00339 if (bytesRead > 0) {
00340 traceSocketRead(connectionP, bytesRead);
00341 connectionP->inbytes += bytesRead;
00342 connectionP->buffersize += bytesRead;
00343 connectionP->buffer[connectionP->buffersize] = '\0';
00344 gotData = TRUE;
00345 }
00346 }
00347 }
00348 }
00349 }
00350 if (gotData)
00351 return TRUE;
00352 else
00353 return FALSE;
00354 }
00355
00356
00357
00358 abyss_bool
00359 ConnWrite(TConn * const connectionP,
00360 const void * const buffer,
00361 uint32_t const size) {
00362
00363 abyss_bool failed;
00364
00365 SocketWrite(connectionP->socketP, buffer, size, &failed);
00366
00367 traceSocketWrite(connectionP, buffer, size, failed);
00368
00369 if (!failed)
00370 connectionP->outbytes += size;
00371
00372 return !failed;
00373 }
00374
00375
00376
00377 abyss_bool
00378 ConnWriteFromFile(TConn * const connectionP,
00379 TFile * const fileP,
00380 uint64_t const start,
00381 uint64_t const last,
00382 void * const buffer,
00383 uint32_t const buffersize,
00384 uint32_t const rate) {
00385
00386
00387
00388
00389
00390
00391
00392
00393 abyss_bool retval;
00394 uint32_t waittime;
00395 abyss_bool success;
00396 uint32_t readChunkSize;
00397
00398 if (rate > 0) {
00399 readChunkSize = MIN(buffersize, rate);
00400 waittime = (1000 * buffersize) / rate;
00401 } else {
00402 readChunkSize = buffersize;
00403 waittime = 0;
00404 }
00405
00406 success = FileSeek(fileP, start, SEEK_SET);
00407 if (!success)
00408 retval = FALSE;
00409 else {
00410 uint64_t const totalBytesToRead = last - start + 1;
00411 uint64_t bytesread;
00412
00413 bytesread = 0;
00414
00415 while (bytesread < totalBytesToRead) {
00416 uint64_t const bytesLeft = totalBytesToRead - bytesread;
00417 uint64_t const bytesToRead = MIN(readChunkSize, bytesLeft);
00418
00419 uint64_t bytesReadThisTime;
00420
00421 bytesReadThisTime = FileRead(fileP, buffer, bytesToRead);
00422 bytesread += bytesReadThisTime;
00423
00424 if (bytesReadThisTime > 0)
00425 ConnWrite(connectionP, buffer, bytesReadThisTime);
00426 else
00427 break;
00428
00429 if (waittime > 0)
00430 xmlrpc_millisecond_sleep(waittime);
00431 }
00432 retval = (bytesread >= totalBytesToRead);
00433 }
00434 return retval;
00435 }
00436
00437
00438
00439 static void
00440 processHeaderLine(char * const start,
00441 const char * const headerStart,
00442 TConn * const connectionP,
00443 time_t const deadline,
00444 abyss_bool * const gotHeaderP,
00445 char ** const nextP,
00446 abyss_bool * const errorP) {
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457 abyss_bool gotHeader;
00458 char * lfPos;
00459 char * p;
00460
00461 p = start;
00462
00463 gotHeader = FALSE;
00464
00465 lfPos = strchr(p, LF);
00466 if (lfPos) {
00467 if ((*p != LF) && (*p != CR)) {
00468
00469 if (*(lfPos+1) == '\0') {
00470
00471
00472
00473
00474 int const timeLeft = deadline - time(NULL);
00475
00476 *errorP = !ConnRead(connectionP, timeLeft);
00477 }
00478 if (!*errorP) {
00479 p = lfPos;
00480
00481
00482
00483
00484
00485
00486 if ((*(p+1) == ' ') || (*(p+1) == '\t')) {
00487 if (p > headerStart && *(p-1) == CR)
00488 *(p-1) = ' ';
00489 *p++ = ' ';
00490 } else
00491 gotHeader = TRUE;
00492 }
00493 } else {
00494
00495
00496
00497 p = lfPos;
00498 gotHeader = TRUE;
00499 }
00500 }
00501
00502 if (gotHeader) {
00503
00504
00505
00506
00507
00508 if (p > headerStart && *(p-1) == CR)
00509 *(p-1) = '\0';
00510 else
00511 *p = '\0';
00512
00513 ++p;
00514 }
00515 *gotHeaderP = gotHeader;
00516 *nextP = p;
00517 }
00518
00519
00520
00521 abyss_bool
00522 ConnReadHeader(TConn * const connectionP,
00523 char ** const headerP) {
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543 uint32_t const deadline = time(NULL) + connectionP->server->srvP->timeout;
00544
00545 abyss_bool retval;
00546 char * p;
00547 char * headerStart;
00548 abyss_bool error;
00549 abyss_bool gotHeader;
00550
00551 p = connectionP->buffer + connectionP->bufferpos;
00552 headerStart = p;
00553
00554 gotHeader = FALSE;
00555 error = FALSE;
00556
00557 while (!gotHeader && !error) {
00558 int const timeLeft = deadline - time(NULL);
00559
00560 if (timeLeft <= 0)
00561 error = TRUE;
00562 else {
00563 if (p >= connectionP->buffer + connectionP->buffersize)
00564
00565 error = !ConnRead(connectionP, timeLeft);
00566
00567 if (!error) {
00568 assert(connectionP->buffer + connectionP->buffersize > p);
00569 processHeaderLine(p, headerStart, connectionP, deadline,
00570 &gotHeader, &p, &error);
00571 }
00572 }
00573 }
00574 if (gotHeader) {
00575
00576
00577
00578
00579 connectionP->bufferpos += p - headerStart;
00580 *headerP = headerStart;
00581 retval = TRUE;
00582 } else
00583 retval = FALSE;
00584
00585 return retval;
00586 }
00587
00588
00589
00590 TServer *
00591 ConnServer(TConn * const connectionP) {
00592 return connectionP->server;
00593 }
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629