00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <sys/types.h>
00023 #include <sys/socket.h>
00024 #include <netinet/tcp.h>
00025 #include <string.h>
00026 #include <errno.h>
00027 #include <unistd.h>
00028 #include <sys/poll.h>
00029 #include <signal.h>
00030 #include <time.h>
00031 #include <string.h>
00032 #include <sys/types.h>
00033 #include <sys/wait.h>
00034
00035 #include "../../ip_addr.h"
00036 #include "../../hash_func.h"
00037 #include "../../mem/mem.h"
00038 #include "../../mem/shm_mem.h"
00039 #include "../../dprint.h"
00040 #include "../../locking.h"
00041
00042 #include "seas.h"
00043 #include "ha.h"
00044 #include "cluster.h"
00045 #include "seas_action.h"
00046 #include "statistics.h"
00047 #include "event_dispatcher.h"
00048
00049 #define PING_OVER_FACTOR 2
00050 #define MAX_WRITE_TRIES 10
00051
00052 char *action_names[]={"NONE",
00053 "PROVISIONAL_REPLY",
00054 "FINAL_REPLY",
00055 "REPLY_FIN_DLG",
00056 "UAC_REQ",
00057 "AC_RES_FAIL",
00058 "STATELESS_MSG",
00059 "AC_CANCEL",
00060 "JAIN_PONG"};
00061
00062
00063 struct unc_as unc_as_t[2*MAX_UNC_AS_NR];
00064
00065
00066 struct as_entry *my_as;
00067 extern int process_no;
00068 extern int sig_flag;
00069
00070 static int process_event_reply(as_p as);
00071 static int handle_as_data(int fd);
00072 static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type);
00073 static inline int send_sockinfo(int fd);
00074 static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as);
00075 static int dispatch_relay();
00076 static int new_as_connect(int fd,char which);
00077 static inline int read_name(int sock,char *dst,int dstlen);
00078 static int handle_unc_as_data(int fd);
00079 static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd);
00080
00081
00082
00083
00084
00085
00086 int dispatcher_main_loop(void)
00087 {
00088 struct pollfd poll_fds[3+MAX_AS_NR],*poll_tmp;
00089 int clean_index,i,j,k,fd,poll_events=0,socks[2],chld_status;
00090 int as_nr,unc_as_nr;
00091 pid_t chld;
00092 struct timeval last_ping,now;
00093 struct as_entry *as;
00094
00095 sig_flag=0;
00096 is_dispatcher=1;
00097 as_nr=0;
00098
00099 timerclear(&last_ping);
00100 timerclear(&now);
00101 signal(SIGCHLD,seas_sighandler);
00102 signal(SIGTERM,seas_sighandler);
00103 signal(SIGUSR1,seas_sighandler);
00104 signal(SIGINT, seas_sighandler);
00105 signal(SIGKILL,seas_sighandler);
00106
00107 strcpy(whoami,"Seas Event Dispatcher process");
00108
00109
00110 process_no = -1;
00111
00112 if(open_server_sockets(seas_listen_ip,seas_listen_port,socks)==-1){
00113 LM_ERR("unable to open server sockets on dispatcher\n");
00114 return -1;
00115 }
00116 for(i=0;i<2;i++){
00117 poll_fds[i].fd=socks[i];
00118 poll_fds[i].revents=0;
00119 poll_fds[i].events=POLLIN;
00120 }
00121 poll_fds[2].fd=read_pipe;
00122 poll_fds[2].revents=0;
00123 poll_fds[2].events=POLLIN;
00124
00125 poll_events=0;
00126 unc_as_nr=0;
00127
00128 if(use_ha)
00129 spawn_pinger();
00130
00131 while(1){
00132 if(sig_flag==SIGCHLD){
00133 while ((chld=waitpid( -1, &chld_status, WNOHANG ))>0) {
00134 if (WIFEXITED(chld_status)){
00135 LM_INFO("child process %d exited normally, status=%d\n",
00136 chld,WEXITSTATUS(chld_status));
00137 }else if (WIFSIGNALED(chld_status)) {
00138 LM_INFO("child process %d exited by a signal %d\n",
00139 chld,WTERMSIG(chld_status));
00140 }else if (WIFSTOPPED(chld_status))
00141 LM_INFO("child process %d stopped by a signal %d\n",
00142 chld,WSTOPSIG(chld_status));
00143 for (as=as_list;as;as=as->next) {
00144 if(as->type!=AS_TYPE)
00145 continue;
00146 if(as->u.as.action_pid==chld){
00147 for(i=0;i<as_nr && ((poll_fds[3+i].fd)!=(as->u.as.event_fd));i++)
00148 ;
00149 if(i==as_nr){
00150 LM_ERR("Either the pinger has died or BUG found..\n");
00151 continue;
00152 }
00153
00154 for(j=3+i;j<(as_nr+unc_as_nr+3-1);i++){
00155 poll_fds[j].fd=poll_fds[j+1].fd;
00156 poll_fds[j].events=poll_fds[j+1].events;
00157 poll_fds[j].revents=poll_fds[j+1].revents;
00158 }
00159 close(as->u.as.event_fd);
00160 if (as->u.as.ev_buffer.s) {
00161 pkg_free(as->u.as.ev_buffer.s);
00162 as->u.as.ev_buffer.s=(char *)0;
00163 as->u.as.ev_buffer.len=0;
00164 }
00165 as->u.as.event_fd=as->u.as.action_fd=-1;
00166 as->connected=0;
00167 destroy_pingtable(&as->u.as.jain_pings);
00168 destroy_pingtable(&as->u.as.servlet_pings);
00169 as_nr--;
00170 LM_WARN("client [%.*s] leaving (Action Dispatcher Process died !)\n",
00171 as->name.len,as->name.s);
00172 break;
00173 }
00174 }
00175 }
00176 }else if (sig_flag) {
00177 LM_WARN("received signal != sigchld(%d)\n",sig_flag);
00178 }
00179 sig_flag=0;
00180 clean_index=0;
00181 LM_INFO("polling [2 ServSock] [1 pipe] [%d App Servers]"
00182 " [%d Uncomplete AS]\n",as_nr,unc_as_nr);
00183 poll_events = poll(poll_fds,3+unc_as_nr+as_nr,-1);
00184 if (poll_events == -1) {
00185 if(errno==EINTR){
00186
00187
00188 continue;
00189 }
00190 if(errno==EBADF){
00191 LM_ERR("invalid file descriptor pased to poll (%s)\n",
00192 strerror(errno));
00193 return -1;
00194 }
00195
00196 LM_ERR("poll'ing:%s\n",strerror(errno));
00197 poll_events=0;
00198 continue;
00199 } else if (poll_events == 0) {
00200 continue;
00201 } else {
00202
00203 for(i=0;i<2;i++){
00204 if(poll_fds[i].revents)
00205 poll_events--;
00206 if(poll_fds[i].revents & POLLIN){
00207 poll_fds[i].revents &= (~POLLIN);
00208 if((fd=new_as_connect(socks[i],i==0?'e':'a'))>=0){
00209 poll_tmp=&poll_fds[3+as_nr+unc_as_nr];
00210 poll_tmp->fd=fd;
00211 poll_tmp->events=POLLIN|POLLHUP;
00212 unc_as_nr++;
00213 LM_DBG("Have new %s client\n",i==0?"event":"action");
00214 }else{
00215 LM_ERR("accepting connection from AS\n");
00216 }
00217 }
00218 }
00219
00220 if(poll_fds[2].revents & POLLIN){
00221 poll_fds[2].revents &= (~POLLIN);
00222 poll_events--;
00223 if(dispatch_relay()<0){
00224 LM_ERR("dispatch_relay returned -1"
00225 "should clean-up table\n");
00226 }
00227 }
00228
00229 clean_index=0;
00230 LM_DBG("Scanning data from %d AS\n",as_nr);
00231 for(i=0;(i<as_nr) && poll_events;i++){
00232 clean_index=0;
00233 poll_tmp=&poll_fds[3+i];
00234 if(poll_tmp->revents)
00235 poll_events--;
00236 if(poll_tmp->revents & POLLIN){
00237 LM_DBG("POLLIN found in AS #%i\n",i);
00238 poll_tmp->revents &= (~POLLIN);
00239 switch(handle_as_data(poll_tmp->fd)){
00240 case -2:
00241 clean_index=1;
00242 break;
00243 case -1:
00244 LM_ERR("reading from AS socket\n");
00245 break;
00246 case 0:
00247 break;
00248 default:
00249 LM_WARN("unknown return type from handle_as_data\n");
00250 }
00251 }
00252 if(clean_index || (poll_tmp->revents & POLLHUP)){
00253 LM_DBG("POLHUP or read==0 found in %i AS \n",i);
00254 clean_index=0;
00255 poll_tmp->revents = 0;
00256 for(as=as_list;as;as=as->next){
00257 if(as->type==CLUSTER_TYPE)
00258 continue;
00259 if(as->connected && (as->u.as.event_fd == poll_tmp->fd)){
00260 close(poll_tmp->fd);
00261
00262 as->connected=0;
00263 as_nr--;
00264
00265 for(k=i;k<(as_nr+unc_as_nr);k++){
00266 j=3+k;
00267 poll_fds[j].fd=poll_fds[j+1].fd;
00268 poll_fds[j].events=poll_fds[j+1].events;
00269 poll_fds[j].revents=poll_fds[j+1].revents;
00270 }
00271 --i;
00272 LM_WARN("client %.*s leaving !!!\n",as->name.len,as->name.s);
00273 break;
00274 }
00275 }
00276 if (!as) {
00277 LM_ERR("the leaving client was not found in the as_list\n");
00278 }
00279 }
00280 }
00281
00282 LM_DBG("Scanning data from %d uncomplete AS \n",unc_as_nr);
00283 clean_index=0;
00284 for(i=0;i<unc_as_nr && poll_events;i++){
00285 poll_tmp=&poll_fds[3+as_nr+i];
00286 if(poll_tmp->revents)
00287 poll_events--;
00288 if(poll_tmp->revents & POLLIN){
00289 LM_DBG("POLLIN found in %d uncomplete AS \n",i);
00290 poll_tmp->revents &= (~POLLIN);
00291 fd=handle_unc_as_data(poll_tmp->fd);
00292 if(fd>0){
00293
00294 for(k=i;k>0;k--){
00295 j=3+as_nr+k;
00296 poll_fds[j].fd=poll_fds[j-1].fd;
00297 poll_fds[j].events=poll_fds[j-1].events;
00298 poll_fds[j].revents=poll_fds[j-1].revents;
00299 }
00300 poll_fds[3+as_nr].fd=fd;
00301 poll_fds[3+as_nr].events=POLLIN|POLLHUP;
00302 poll_fds[3+as_nr].revents=0;
00303 as_nr++;
00304 unc_as_nr--;
00305 }else if(fd<=0){
00306 poll_tmp->revents=0;
00307 for(k=i;k<(unc_as_nr-1);k++){
00308 j=3+as_nr+k;
00309 poll_fds[j].fd=poll_fds[j+1].fd;
00310 poll_fds[j].events=poll_fds[j+1].events;
00311 poll_fds[j].revents=poll_fds[j+1].revents;
00312 }
00313 unc_as_nr--;
00314
00315
00316 i--;
00317 }
00318 }
00319 if(poll_tmp->revents & POLLHUP){
00320 LM_DBG("POLLHUP found in %d uncomplete AS \n",i);
00321 close(poll_tmp->fd);
00322 for(k=i;k<(unc_as_nr-1);k++){
00323 j=3+as_nr+k;
00324 poll_fds[j].fd=poll_fds[j+1].fd;
00325 poll_fds[j].events=poll_fds[j+1].events;
00326 poll_fds[j].revents=poll_fds[j+1].revents;
00327 }
00328 unc_as_nr--;
00329 i--;
00330 poll_tmp->revents = 0;
00331 }
00332 }
00333 }
00334 }
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd)
00354 {
00355
00356
00357 union sockaddr_union su;
00358 int i,optval;
00359
00360 fd[0]=fd[1]=-1;
00361
00362 if(address->af!=AF_INET && address->af!=AF_INET6){
00363 LM_ERR("Only ip and ipv6 allowed socket types\n");
00364 return -1;
00365 }
00366
00367 for(i=0;i<2;i++){
00368 if(init_su(&su,address,port+i)<0){
00369 LM_ERR("unable to init sockaddr_union\n");
00370 return -1;
00371 }
00372 if((fd[i]=socket(AF2PF(su.s.sa_family), SOCK_STREAM, 0))==-1){
00373 LM_ERR("trying to open server %s socket (%s)\n",i==0?"event":"action",strerror(errno));
00374 goto error;
00375 }
00376 optval=1;
00377 if (setsockopt(fd[i], SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof(optval))==-1) {
00378 LM_ERR("setsockopt (%s)\n",strerror(errno));
00379 goto error;
00380 }
00381 if ((bind(fd[i], &su.s,sizeof(union sockaddr_union)))==-1){
00382 LM_ERR( "bind (%s)\n",strerror(errno));
00383 goto error;
00384 }
00385 if (listen(fd[i], 10)==-1){
00386 LM_ERR( "listen (%s)\n",strerror(errno));
00387 goto error;
00388 }
00389 }
00390 return 0;
00391
00392 error:
00393 for(i=0;i<2;i++)
00394 if(fd[i]!=-1){
00395 close(fd[i]);
00396 fd[i]=-1;
00397 }
00398 return -1;
00399 }
00400
00401
00402 union helper{
00403 as_msg_p ptr;
00404 char bytes[sizeof(as_msg_p)];
00405 };
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418 static int dispatch_relay(void)
00419 {
00420 int i,j,retval,tries;
00421 union helper thepointer;
00422
00423 i=j=0;
00424 retval=0;
00425 read_again:
00426 i=read(read_pipe,thepointer.bytes+j,sizeof(as_msg_p)-j);
00427 if(i<0){
00428 if(errno==EINTR){
00429 goto read_again;
00430 }else{
00431 LM_ERR("Dispatcher Process received unknown error"
00432 " reading from pipe (%s)\n",strerror(errno));
00433 retval=-1;
00434 goto error;
00435 }
00436 }else if(i==0){
00437 LM_ERR("Dispatcher Process "
00438 "received 0 while reading from pipe\n");
00439 goto error;
00440 }else{
00441 j+=i;
00442 if(j<sizeof(as_msg_p))
00443 goto read_again;
00444 }
00445
00446 if (!thepointer.ptr) {
00447 LM_ERR("Received Corrupted pointer to event !!\n");
00448 retval=0;
00449 goto error;
00450 }
00451
00452 if(use_stats && thepointer.ptr->transaction)
00453 event_stat(thepointer.ptr->transaction);
00454 if(thepointer.ptr->as == NULL || !thepointer.ptr->as->connected || thepointer.ptr->as->type==CLUSTER_TYPE){
00455 LM_WARN("tryied to send an event to an App Server"
00456 " that is scheduled to die!!\n");
00457 retval=-2;
00458 goto error;
00459 }
00460 j=0;
00461 tries=0;
00462 write_again:
00463 i=write(thepointer.ptr->as->u.as.event_fd,thepointer.ptr->msg+j,thepointer.ptr->len-j);
00464 if(i==-1){
00465 switch(errno){
00466 case EINTR:
00467 if(!thepointer.ptr->as->connected){
00468 LM_WARN("tryied to send an event to an App Server"
00469 " that is scheduled to die!!\n");
00470 retval=-2;
00471 goto error;
00472 }
00473 goto write_again;
00474 case EPIPE:
00475 LM_ERR("AS [%.*s] closed "
00476 "the socket !\n",thepointer.ptr->as->u.as.name.len,thepointer.ptr->as->u.as.name.s);
00477 retval=-2;
00478 goto error;
00479 default:
00480 LM_ERR("unknown error while trying to write to AS socket(%s)\n",
00481 strerror(errno));
00482 retval=-2;
00483 goto error;
00484 }
00485 }else if(i>0){
00486 j+=i;
00487 if(j<thepointer.ptr->len)
00488 goto write_again;
00489 }else if(i==0){
00490 if (tries++ > MAX_WRITE_TRIES) {
00491 LM_ERR("MAX WRITE TRIES !!!\n");
00492 goto error;
00493 }else
00494 goto write_again;
00495 }
00496 LM_DBG("Event relaied to %.*s AS\n",thepointer.ptr->as->u.as.name.len,
00497 thepointer.ptr->as->u.as.name.s);
00498 LM_DBG("Event type %s \n",action_names[thepointer.ptr->type]);
00499 retval=0;
00500 error:
00501 if(thepointer.ptr){
00502 if(thepointer.ptr->msg)
00503 shm_free(thepointer.ptr->msg);
00504 shm_free(thepointer.ptr);
00505 }
00506 return retval;
00507 }
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517 static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as)
00518 {
00519 struct unc_as *ev,*ac;
00520 int j;
00521 as_p the_as=0;
00522 struct as_entry *tmp;
00523
00524 ev=&unc_as_t[event_idx];
00525 ac=&unc_as_t[action_idx];
00526
00527 the_as=&(as->u.as);
00528
00529 the_as->action_fd=ac->fd;
00530 the_as->event_fd=ev->fd;
00531 the_as->name.len = strlen(ev->name);
00532 if(use_ha){
00533 if(jain_ping_timeout){
00534 if (0>init_pingtable(&the_as->jain_pings,jain_ping_timeout,(jain_ping_timeout/jain_ping_period+1)*PING_OVER_FACTOR)){
00535 LM_ERR("Unable to init jain pinging table...\n");
00536 goto error;
00537 }
00538 }
00539 if(servlet_ping_timeout){
00540 if (0>init_pingtable(&the_as->servlet_pings,servlet_ping_timeout,(servlet_ping_timeout/servlet_ping_period+1)*PING_OVER_FACTOR)){
00541 LM_ERR("Unable to init servlet pinging table...\n");
00542 goto error;
00543 }
00544 }
00545 }
00546
00547
00548 if(!(the_as->ev_buffer.s = pkg_malloc(AS_BUF_SIZE))){
00549 LM_ERR("unable to alloc pkg mem for the event buffer\n");
00550 goto error;
00551 }
00552 the_as->ev_buffer.len=0;
00553 as->connected=1;
00554 the_as->action_pid=0;
00555 for(tmp=as_list;tmp;tmp=tmp->next){
00556 if(tmp->type==AS_TYPE)
00557 continue;
00558 for (j=0;j<tmp->u.cs.num;j++) {
00559 if (tmp->u.cs.as_names[j].len == the_as->name.len &&
00560 !memcmp(tmp->u.cs.as_names[j].s,the_as->name.s,the_as->name.len)) {
00561 if(tmp->u.cs.num==tmp->u.cs.registered){
00562 LM_ERR("AS %.*s belongs to cluster %.*s which is already completed\n",
00563 the_as->name.len,the_as->name.s,tmp->name.len,tmp->name.s);
00564 break;
00565 }
00566 tmp->u.cs.registered++;
00567 break;
00568 }
00569 }
00570 }
00571 if(0>spawn_action_dispatcher(as)){
00572 LM_ERR("Unable to spawn Action Dispatcher for as %s\n",ev->name);
00573 goto error;
00574 }
00575 if(send_sockinfo(the_as->event_fd)==-1){
00576 LM_ERR("Unable to send socket info to as %s\n",ev->name);
00577 goto error;
00578 }
00579 return 0;
00580 error:
00581 if(the_as->ev_buffer.s){
00582 pkg_free(the_as->ev_buffer.s);
00583 the_as->ev_buffer.s=(char*)0;
00584 }
00585 if(the_as->action_pid)
00586 kill(the_as->action_pid,SIGTERM);
00587 if(jain_ping_timeout)
00588 destroy_pingtable(&the_as->jain_pings);
00589 if(servlet_ping_timeout)
00590 destroy_pingtable(&the_as->servlet_pings);
00591 return -1;
00592 }
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612 static inline int send_sockinfo(int fd)
00613 {
00614 struct socket_info *s;
00615 unsigned char i;
00616 char buffer[300];
00617 int k=0,j;
00618 buffer[k++]=16;
00619 for(i=0,s=udp_listen;s;s=s->next,i++);
00620 #ifdef USE_TCP
00621 for(s=tcp_listen;s;s=s->next,i++);
00622 #endif
00623 #ifdef USE_TLS
00624 for(s=tls_listen;s;s=s->next,i++);
00625 #endif
00626 if(i==0){
00627 LM_ERR("no udp|tcp|tls sockets ?!!\n");
00628 return -1;
00629 }
00630 buffer[k++]=i;
00631 for(s=udp_listen;s;s=s->next){
00632 if(print_sock_info(buffer,300,&k,s,PROTO_UDP)==-1)
00633 return -1;
00634 }
00635 #ifdef USE_TCP
00636 for(s=tcp_listen;s;s=s->next){
00637 if(print_sock_info(buffer,300,&k,s,PROTO_TCP)==-1)
00638 return -1;
00639 }
00640 #endif
00641 #ifdef USE_TLS
00642 for(s=tls_listen;s;s=s->next){
00643 if(print_sock_info(buffer,300,&k,s,PROTO_TLS)==-1)
00644 return -1;
00645 }
00646 #endif
00647 write_again:
00648 j=write(fd,buffer,k);
00649 if(j==-1){
00650 if(errno==EINTR)
00651 goto write_again;
00652 else
00653 return -1;
00654 }
00655 return 0;
00656 }
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670 static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type)
00671 {
00672 int k;
00673 unsigned char i;
00674 unsigned short int j;
00675 if((wheremax-*idx)<49)
00676 return -1;
00677 k=*idx;
00678 buffer[k++]=(char)type;
00679 if((i=(unsigned char)s->name.len)>30){
00680 LM_ERR("name too long\n");
00681 return -1;
00682 }
00683 buffer[k++]=i;
00684 memcpy(&buffer[k],s->name.s,i);
00685 k+=i;
00686 i=(unsigned char)s->address_str.len;
00687 buffer[k++]=i;
00688 memcpy(&buffer[k],s->address_str.s,i);
00689 k+=i;
00690 j=htons(s->port_no);
00691 memcpy(&buffer[k],&j,2);
00692 k+=2;
00693 *idx=k;
00694 return 0;
00695 }
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708 static int handle_as_data(int fd)
00709 {
00710 int j,k;
00711 struct as_entry *as;
00712 for(as=as_list;as;as=as->next)
00713 if(as->type == AS_TYPE && as->connected && (as->u.as.event_fd==fd))
00714 break;
00715 if(!as){
00716 LM_ERR("AS not found\n");
00717 return -1;
00718 }
00719 k=AS_BUF_SIZE-(as->u.as.ev_buffer.len);
00720 again:
00721 if((j=read(fd,as->u.as.ev_buffer.s+as->u.as.ev_buffer.len,k))<0){
00722 LM_ERR("reading data for as %.*s\n",as->name.len,as->name.s);
00723 if(errno==EINTR)
00724 goto again;
00725 else
00726 return -1;
00727 }else if(j==0){
00728 LM_ERR("AS client leaving (%.*s)\n",as->name.len,as->name.s);
00729 return -2;
00730 }
00731 as->u.as.ev_buffer.len+=j;
00732 LM_DBG("read %d bytes from AS (total = %d)\n",j,as->u.as.ev_buffer.len);
00733 if(as->u.as.ev_buffer.len>5)
00734 process_event_reply(&as->u.as);
00735 return 0;
00736 }
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759 static int process_event_reply(as_p as)
00760 {
00761 unsigned int ev_len;
00762 ev_len=(as->ev_buffer.s[0]<<24)|(as->ev_buffer.s[1]<<16)|(as->ev_buffer.s[2]<<8)|(as->ev_buffer.s[3]);
00763
00764
00765 if(ev_len>AS_BUF_SIZE){
00766 LM_WARN("Packet too big (%d)!!! should be skipped"
00767 " and an error returned!\n",ev_len);
00768 return -1;
00769 }
00770 if((as->ev_buffer.len<ev_len) || as->ev_buffer.len<4)
00771 return 0;
00772 switch(as->ev_buffer.s[4]){
00773 case BIND_AC:
00774 LM_DBG("Processing a BIND action from AS (length=%d): %.*s\n",
00775 ev_len,as->name.len,as->name.s);
00776 process_bind_action(as,&as->ev_buffer.s[5],ev_len-5);
00777 break;
00778 case UNBIND_AC:
00779 LM_DBG("Processing a UNBIND action from AS (length=%d): %.*s\n",
00780 ev_len,as->name.len,as->name.s);
00781 process_unbind_action(as,&as->ev_buffer.s[5],ev_len-5);
00782 break;
00783 default:
00784 return 0;
00785 }
00786 memmove(as->ev_buffer.s,&(as->ev_buffer.s[ev_len]),(as->ev_buffer.len)-ev_len);
00787 (as->ev_buffer.len)-=ev_len;
00788 return 0;
00789 }
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800
00801
00802
00803
00804 int process_bind_action(as_p as,char *payload,int len)
00805 {
00806 struct socket_info *si,*xxx_listen;
00807 struct ip_addr my_addr;
00808 int i,k,proto;
00809 unsigned int flags;
00810 unsigned short port;
00811 char processor_id,buffer[300],*proto_s;
00812 k=0;
00813 *buffer=0;
00814 proto_s="NONE";
00815 net2hostL(flags,payload,k);
00816 processor_id=payload[k++];
00817 for(i=0;i<MAX_BINDS;i++){
00818 if(as->bound_processor[i]==0)
00819 break;
00820 }
00821 if(i==MAX_BINDS){
00822 LM_ERR("No more bindings allowed. Ignoring bind request for processor %d\n",processor_id);
00823 return -1;
00824 }
00825 memset(&my_addr,0,sizeof(struct ip_addr));
00826 my_addr.af=payload[k++];
00827 my_addr.len=payload[k++];
00828 memcpy(my_addr.u.addr,payload+k,my_addr.len);
00829 k+=my_addr.len;
00830 proto=payload[k++];
00831 memcpy(&port,payload+k,2);
00832 k+=2;
00833 port=ntohs(port);
00834 print_ip_buf(&my_addr,buffer,300);
00835 switch(proto){
00836 case PROTO_UDP:
00837 proto_s="UDP";
00838 xxx_listen=udp_listen;
00839 break;
00840 #ifdef USE_TCP
00841 case PROTO_TCP:
00842 proto_s="TCP";
00843 xxx_listen=tcp_listen;
00844 break;
00845 #endif
00846 #ifdef USE_TLS
00847 case PROTO_TLS:
00848 proto_s="TLS";
00849 xxx_listen=tls_listen;
00850 break;
00851 #endif
00852 default:
00853 goto error;
00854 }
00855 for(si=xxx_listen;si;si=si->next){
00856 if(my_addr.af==si->address.af &&
00857 my_addr.len==si->address.len &&
00858 !memcmp(si->address.u.addr,my_addr.u.addr,my_addr.len) &&
00859 port == si->port_no){
00860 as->binds[i]=si;
00861 as->bound_processor[i]=processor_id;
00862 as->num_binds++;
00863 LM_DBG("AS processor with id: %d bound to %s %s %d\n",processor_id,proto_s,buffer,port);
00864 return 0;
00865 }
00866 }
00867 error:
00868 LM_ERR("Cannot bind to %s %s %d !!!\n",proto_s,buffer,port);
00869 return -1;
00870 }
00871
00872
00873
00874
00875
00876
00877
00878 int process_unbind_action(as_p as,char *payload,int len)
00879 {
00880 int i,k;
00881 unsigned int flags;
00882 char processor_id;
00883 k=0;
00884 net2hostL(flags,payload,k);
00885 processor_id=payload[k++];
00886 for(i=0;i<as->num_binds;i++){
00887 if(as->bound_processor[i] == processor_id)
00888 break;
00889 }
00890 if(i==MAX_BINDS){
00891 LM_ERR("tried to unbind a processor which is not registered (id=%d)!\n",processor_id);
00892 return 0;
00893 }
00894 as->bound_processor[i]=0;
00895 as->num_binds--;
00896 LM_DBG("AS processor un-bound with id: %d\n",processor_id);
00897 return 0;
00898 }
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909 static int handle_unc_as_data(int fd)
00910 {
00911 int i,j,k,len;
00912 char *name1;
00913 struct as_entry *as;
00914
00915 for(i=0;i<2*MAX_UNC_AS_NR ;i++)
00916 if(unc_as_t[i].valid && unc_as_t[i].fd==fd)
00917 break;
00918 if(i==2*MAX_UNC_AS_NR){
00919 LM_ERR("has received an fd which is not in uncompleted AS array\n");
00920 return -1;
00921 }
00922 if(unc_as_t[i].flags & HAS_NAME){
00923 LM_WARN("this shouldn't happen\n");
00924 return 0;
00925 }
00926 LM_DBG("Reading client name\n");
00927
00928 if(-1==(len=read_name(fd,unc_as_t[i].name,MAX_AS_NAME))){
00929
00930 LM_ERR("Bad name passed from fd\n");
00931 unc_as_t[i].valid=0;
00932 unc_as_t[i].flags=0;
00933 return -2;
00934 }else if(len==-2){
00935 LM_WARN("client disconnected\n");
00936 return -2;
00937 }
00938 name1=unc_as_t[i].name;
00939
00940
00941 for(as=as_list;as;as=as->next){
00942 if(as->name.len==len && !memcmp(name1,as->name.s,len)){
00943 if(as->connected){
00944 LM_WARN("AppServer trying to connect with a name already taken (%.*s)\n",len,name1);
00945 unc_as_t[i].valid=0;
00946 unc_as_t[i].flags=0;
00947 return -2;
00948 }
00949 break;
00950 }
00951 }
00952 if (!as) {
00953 LM_ERR("a client tried to connect which is not declared in config. script(%.*s)\n",len,name1);
00954 unc_as_t[i].valid=0;
00955 unc_as_t[i].flags=0;
00956 return -2;
00957 }
00958 unc_as_t[i].flags |= HAS_NAME;
00959
00960
00961 k=(i>=MAX_UNC_AS_NR?MAX_UNC_AS_NR:2*MAX_UNC_AS_NR);
00962
00963 for(j=(i>=MAX_UNC_AS_NR?0:MAX_UNC_AS_NR);j<k;j++)
00964 if(unc_as_t[j].valid &&
00965 (unc_as_t[j].flags & HAS_NAME) &&
00966 !strcmp(unc_as_t[i].name,unc_as_t[j].name))
00967 break;
00968 LM_INFO("Fantastic, we have a new client: %s\n",unc_as_t[i].name);
00969 if(j==k)
00970 return 0;
00971 LM_INFO("EUREKA, we have a new completed AS: %s\n",unc_as_t[i].name);
00972
00973 if(add_new_as(i<j?i:j,i<j?j:i,as)==-1){
00974 close(unc_as_t[j].fd);
00975 close(unc_as_t[i].fd);
00976 unc_as_t[j].valid=unc_as_t[i].valid=0;
00977 unc_as_t[j].flags=unc_as_t[i].flags=0;
00978 return -1;
00979 }
00980 unc_as_t[j].valid=unc_as_t[i].valid=0;
00981 unc_as_t[j].flags=unc_as_t[i].flags=0;
00982 return unc_as_t[i<j?i:j].fd;
00983 }
00984
00985
00986
00987
00988
00989
00990 static inline int read_name(int sock,char *dst,int dstlen)
00991 {
00992 int n,namelen;
00993 namelen=0;
00994 try_again1:
00995 if((n=read(sock,&namelen,1))<0){
00996 if(errno==EINTR)
00997 goto try_again1;
00998 else{
00999 LM_ERR("trying to read length from fd=%d (%s)\n",sock,strerror(errno));
01000 return -1;
01001 }
01002 }else if(n==0){
01003 LM_WARN("uncomplete AS has disconnected before giving its name\n");
01004 return -2;
01005 }
01006 if(namelen>dstlen || namelen==0){
01007 LM_ERR("name too long to fit in dst (%d > %d)\n",namelen,dstlen);
01008 return -1;
01009 }
01010 try_again2:
01011 if((n=read(sock,dst,namelen))<0){
01012 if(errno==EINTR)
01013 goto try_again2;
01014 else{
01015 LM_ERR("trying to read %d chars into %p from fd=%d (%s)\n",namelen,dst,sock,strerror(errno));
01016 return -1;
01017 }
01018 }else if(n==0){
01019 LM_WARN("uncomplete AS has disconnected before giving its name\n");
01020 return -2;
01021 }
01022 dst[namelen]=0;
01023 return namelen;
01024 }
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039 static int new_as_connect(int fd,char which)
01040 {
01041 union sockaddr_union su;
01042 int sock,i,flags;
01043 socklen_t su_len;
01044
01045 su_len = sizeof(union sockaddr_union);
01046 sock=-1;
01047 again:
01048 sock=accept(fd, &su.s, &su_len);
01049 if(sock==-1){
01050 if(errno==EINTR){
01051 goto again;
01052 }else{
01053 LM_ERR("while accepting connection: %s\n", strerror(errno));
01054 return -1;
01055 }
01056 }
01057 switch(which){
01058 case 'e':
01059 for(i=0;i<MAX_UNC_AS_NR && unc_as_t[i].valid;i++);
01060 if(i==MAX_UNC_AS_NR){
01061 LM_WARN("no more uncomplete connections allowed\n");
01062 goto error;
01063 }
01064 unc_as_t[i].fd=sock;
01065 unc_as_t[i].valid=1;
01066 unc_as_t[i].flags=HAS_FD;
01067 memcpy(&unc_as_t[i].su,&su,su_len);
01068 break;
01069 case 'a':
01070 for(i=MAX_UNC_AS_NR;(i<(2*MAX_UNC_AS_NR)) && unc_as_t[i].valid;i++);
01071 if(i==2*MAX_UNC_AS_NR){
01072 LM_WARN("no more uncomplete connections allowed\n");
01073 goto error;
01074 }
01075 unc_as_t[i].fd=sock;
01076 unc_as_t[i].valid=1;
01077 unc_as_t[i].flags=HAS_FD;
01078 memcpy(&unc_as_t[i].su,&su,su_len);
01079 break;
01080 default:
01081 break;
01082 }
01083 flags=1;
01084 if ((setsockopt(sock, IPPROTO_TCP , TCP_NODELAY,&flags, sizeof(flags))<0) ){
01085 LM_WARN("could not disable Nagle: %s\n",
01086 strerror(errno));
01087 }
01088
01089 return sock;
01090 error:
01091 if(sock!=-1)
01092 close(sock);
01093 return -1;
01094 }
01095
01096 int spawn_action_dispatcher(struct as_entry *the_as)
01097 {
01098 pid_t pid;
01099 pid=fork();
01100 if(pid<0){
01101 LM_ERR("unable to fork an action dispatcher for %.*s\n",the_as->name.len,the_as->name.s);
01102 return -1;
01103 }
01104 if(pid==0){
01105 my_as = the_as;
01106 is_dispatcher=0;
01107 dispatch_actions();
01108 exit(0);
01109 }else{
01110 the_as->u.as.action_pid=pid;
01111 }
01112 return 0;
01113 }
01114