event_dispatcher.c

Go to the documentation of this file.
00001 /* $Id: event_dispatcher.c 4518 2008-07-28 15:39:28Z henningw $
00002  *
00003  * Copyright (C) 2006-2007 VozTelecom Sistemas S.L
00004  *
00005  * This file is part of Kamailio, a free SIP server.
00006  *
00007  * Kamailio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 2 of the License, or
00010  * (at your option) any later version
00011  *
00012  * Kamailio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License 
00018  * along with this program; if not, write to the Free Software 
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  */
00021 
00022 #include <sys/types.h>/*setsockopt,bind,accept,fork,pid_t*/
00023 #include <sys/socket.h>/*setsockopt,bind,accept,listen*/
00024 #include <netinet/tcp.h>/*TCP_NODELAY*/
00025 #include <string.h>/*strcmp,memset*/
00026 #include <errno.h>/*errno*/
00027 #include <unistd.h>/*close(),read(),pipe,fork,pid_t*/
00028 #include <sys/poll.h>/*poll*/
00029 #include <signal.h>/*signal*/
00030 #include <time.h>/*time*/
00031 #include <string.h>/*memcmp*/
00032 #include <sys/types.h>/*waitpid*/
00033 #include <sys/wait.h>/*waitpid*/
00034 
00035 #include "../../ip_addr.h" /*sockaddr_union, ip_addr*/
00036 #include "../../hash_func.h" /*T_TABLE_POWER*/
00037 #include "../../mem/mem.h" /*pkg_malloc*/
00038 #include "../../mem/shm_mem.h" /*shm_malloc*/
00039 #include "../../dprint.h" /*LM_**/
00040 #include "../../locking.h"
00041 
00042 #include "seas.h"
00043 #include "ha.h"
00044 #include "cluster.h"
00045 #include "seas_action.h"
00046 #include "statistics.h"
00047 #include "event_dispatcher.h"
00048 
00049 #define PING_OVER_FACTOR 2
00050 #define MAX_WRITE_TRIES 10
00051 
00052 char *action_names[]={"NONE",
00053      "PROVISIONAL_REPLY",
00054      "FINAL_REPLY",
00055      "REPLY_FIN_DLG",
00056      "UAC_REQ",
00057      "AC_RES_FAIL",
00058      "STATELESS_MSG",
00059      "AC_CANCEL",
00060      "JAIN_PONG"};
00061 
00062 
00063 struct unc_as unc_as_t[2*MAX_UNC_AS_NR];
00064 
00065 /*this is for the Action Dispatcher Process */
00066 struct as_entry *my_as;
00067 extern int process_no;
00068 extern int sig_flag;
00069 
00070 static int process_event_reply(as_p as);
00071 static int handle_as_data(int fd);
00072 static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type);
00073 static inline int send_sockinfo(int fd);
00074 static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as);
00075 static int dispatch_relay();
00076 static int new_as_connect(int fd,char which);
00077 static inline int read_name(int sock,char *dst,int dstlen);
00078 static int handle_unc_as_data(int fd);
00079 static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd);
00080 
00081 
00082 
00083 /** Main loop for the Event Dispatcher process.
00084  * 
00085  */
00086 int dispatcher_main_loop(void)
00087 {
00088    struct pollfd poll_fds[3+MAX_AS_NR],*poll_tmp;
00089    int clean_index,i,j,k,fd,poll_events=0,socks[2],chld_status;
00090    int as_nr,unc_as_nr;
00091    pid_t chld;
00092    struct timeval last_ping,now;
00093    struct as_entry *as;
00094 
00095    sig_flag=0;
00096    is_dispatcher=1;
00097    as_nr=0;
00098 
00099    timerclear(&last_ping);
00100    timerclear(&now);
00101    signal(SIGCHLD,seas_sighandler);
00102    signal(SIGTERM,seas_sighandler);
00103    signal(SIGUSR1,seas_sighandler);
00104    signal(SIGINT, seas_sighandler);
00105    signal(SIGKILL,seas_sighandler);
00106 
00107    strcpy(whoami,"Seas Event Dispatcher process");
00108    /*I set process_no to -1 because otherwise, the logging process confuses this process with another from SER
00109     * (see LM_*() and dprint() and my_pid())*/
00110    process_no = -1;
00111 
00112    if(open_server_sockets(seas_listen_ip,seas_listen_port,socks)==-1){
00113       LM_ERR("unable to open server sockets on dispatcher\n");
00114       return -1;
00115    }
00116    for(i=0;i<2;i++){
00117       poll_fds[i].fd=socks[i];
00118       poll_fds[i].revents=0;
00119       poll_fds[i].events=POLLIN;
00120    }
00121    poll_fds[2].fd=read_pipe;
00122    poll_fds[2].revents=0;
00123    poll_fds[2].events=POLLIN;/*pollhup ?*/
00124 
00125    poll_events=0;
00126    unc_as_nr=0;
00127 
00128    if(use_ha)
00129       spawn_pinger();
00130 
00131    while(1){
00132       if(sig_flag==SIGCHLD){
00133     while ((chld=waitpid( -1, &chld_status, WNOHANG ))>0) {
00134        if (WIFEXITED(chld_status)){
00135           LM_INFO("child process %d exited normally, status=%d\n",
00136                chld,WEXITSTATUS(chld_status));
00137        }else if (WIFSIGNALED(chld_status)) {
00138           LM_INFO("child process %d exited by a signal %d\n",
00139                chld,WTERMSIG(chld_status));
00140        }else if (WIFSTOPPED(chld_status)) 
00141           LM_INFO("child process %d stopped by a signal %d\n",
00142                chld,WSTOPSIG(chld_status));
00143        for (as=as_list;as;as=as->next) {
00144           if(as->type!=AS_TYPE)
00145         continue;
00146           if(as->u.as.action_pid==chld){
00147         for(i=0;i<as_nr && ((poll_fds[3+i].fd)!=(as->u.as.event_fd));i++)
00148            ;
00149         if(i==as_nr){
00150            LM_ERR("Either the pinger has died or BUG found..\n");
00151            continue;
00152         }
00153         /*overwrite the obsolete 'i' position with the next position*/
00154         for(j=3+i;j<(as_nr+unc_as_nr+3-1);i++){
00155            poll_fds[j].fd=poll_fds[j+1].fd;
00156            poll_fds[j].events=poll_fds[j+1].events;
00157            poll_fds[j].revents=poll_fds[j+1].revents;
00158         }
00159         close(as->u.as.event_fd);/*close the socket fd*/
00160         if (as->u.as.ev_buffer.s) {
00161            pkg_free(as->u.as.ev_buffer.s);
00162            as->u.as.ev_buffer.s=(char *)0;
00163            as->u.as.ev_buffer.len=0;
00164         }
00165         as->u.as.event_fd=as->u.as.action_fd=-1;
00166         as->connected=0;
00167         destroy_pingtable(&as->u.as.jain_pings);
00168         destroy_pingtable(&as->u.as.servlet_pings);
00169         as_nr--;
00170         LM_WARN("client [%.*s] leaving (Action Dispatcher Process died !)\n",
00171               as->name.len,as->name.s);
00172         break;
00173           }/*if(action_pid==chld)*/
00174        }/*for(as=as_list;as;as=as->next)*/
00175     }/*while(waitpid(-1)>0)*/
00176       }else if (sig_flag) {
00177     LM_WARN("received signal != sigchld(%d)\n",sig_flag);
00178      }
00179       sig_flag=0;
00180       clean_index=0;
00181       LM_INFO("polling [2 ServSock] [1 pipe] [%d App Servers]"
00182            " [%d Uncomplete AS]\n",as_nr,unc_as_nr);
00183       poll_events = poll(poll_fds,3+unc_as_nr+as_nr,-1);
00184       if (poll_events == -1) {
00185     if(errno==EINTR){
00186        /*handle the case a child has died.
00187         * It will be done in the next iteration in if(seas_sigchld_received)*/
00188        continue;
00189     }
00190     if(errno==EBADF){
00191        LM_ERR("invalid file descriptor pased to poll (%s)\n",
00192             strerror(errno));
00193        return -1;/*??*/
00194     }
00195     /* errors */
00196     LM_ERR("poll'ing:%s\n",strerror(errno));
00197     poll_events=0;
00198     continue;
00199       } else if (poll_events == 0) {/*timeout*/
00200     continue;
00201       } else {/*there are events !*/
00202     /*handle connections from server sockets*/
00203     for(i=0;i<2;i++){
00204        if(poll_fds[i].revents)
00205           poll_events--;
00206        if(poll_fds[i].revents & POLLIN){
00207           poll_fds[i].revents &= (~POLLIN);
00208           if((fd=new_as_connect(socks[i],i==0?'e':'a'))>=0){
00209         poll_tmp=&poll_fds[3+as_nr+unc_as_nr];
00210         poll_tmp->fd=fd;
00211         poll_tmp->events=POLLIN|POLLHUP;
00212         unc_as_nr++;
00213         LM_DBG("Have new %s client\n",i==0?"event":"action");
00214           }else{
00215         LM_ERR("accepting connection from AS\n");
00216           }
00217        }
00218     }
00219     /*handle data from pipe*/
00220     if(poll_fds[2].revents & POLLIN){
00221        poll_fds[2].revents &= (~POLLIN);
00222        poll_events--;
00223        if(dispatch_relay()<0){
00224           LM_ERR("dispatch_relay returned -1"
00225            "should clean-up table\n");
00226        }
00227     }
00228     /*now handle receive data from completed AS*/
00229     clean_index=0;
00230     LM_DBG("Scanning data from %d AS\n",as_nr);
00231     for(i=0;(i<as_nr) && poll_events;i++){
00232        clean_index=0;
00233        poll_tmp=&poll_fds[3+i];
00234        if(poll_tmp->revents)
00235           poll_events--;
00236        if(poll_tmp->revents & POLLIN){
00237           LM_DBG("POLLIN found in AS #%i\n",i);
00238           poll_tmp->revents &= (~POLLIN);
00239           switch(handle_as_data(poll_tmp->fd)){
00240         case -2:/*read returned 0 bytes, an AS client is leaving*/
00241            clean_index=1;
00242            break;
00243         case -1:/*shouldnt happen*/
00244            LM_ERR("reading from AS socket\n");
00245            break;
00246         case 0:/* event_response received and processed*/
00247            break;
00248         default:
00249            LM_WARN("unknown return type from handle_as_data\n");
00250           }
00251        }
00252        if(clean_index || (poll_tmp->revents & POLLHUP)){
00253           LM_DBG("POLHUP or read==0 found in %i AS \n",i);
00254           clean_index=0;
00255           poll_tmp->revents = 0;
00256           for(as=as_list;as;as=as->next){
00257         if(as->type==CLUSTER_TYPE)
00258            continue;
00259         if(as->connected && (as->u.as.event_fd == poll_tmp->fd)){
00260            close(poll_tmp->fd);/*close the socket fd*/
00261            /*TODO  we should send a signal to the Action Dispatcher !!!*/
00262            as->connected=0;
00263            as_nr--;
00264            /*overwrite the obsolete 'i' position with the next position*/
00265            for(k=i;k<(as_nr+unc_as_nr);k++){
00266          j=3+k;
00267          poll_fds[j].fd=poll_fds[j+1].fd;
00268          poll_fds[j].events=poll_fds[j+1].events;
00269          poll_fds[j].revents=poll_fds[j+1].revents;
00270            }
00271            --i;
00272            LM_WARN("client %.*s leaving !!!\n",as->name.len,as->name.s);
00273            break;
00274         }
00275           }
00276           if (!as) {
00277         LM_ERR("the leaving client was not found in the as_list\n");
00278          }
00279        }
00280     }
00281     /*now handle data sent from uncompleted AS*/
00282     LM_DBG("Scanning data from %d uncomplete AS \n",unc_as_nr);
00283     clean_index=0;
00284     for(i=0;i<unc_as_nr && poll_events;i++){
00285        poll_tmp=&poll_fds[3+as_nr+i];
00286        if(poll_tmp->revents)
00287           poll_events--;
00288        if(poll_tmp->revents & POLLIN){
00289           LM_DBG("POLLIN found in %d uncomplete AS \n",i);
00290           poll_tmp->revents &= (~POLLIN);
00291           fd=handle_unc_as_data(poll_tmp->fd);
00292           if(fd>0){
00293         /* there's a new AS, push the uncomplete poll_fds up and set the AS */
00294         for(k=i;k>0;k--){
00295            j=3+as_nr+k;
00296            poll_fds[j].fd=poll_fds[j-1].fd;
00297            poll_fds[j].events=poll_fds[j-1].events;
00298            poll_fds[j].revents=poll_fds[j-1].revents;
00299         }
00300         poll_fds[3+as_nr].fd=fd;
00301         poll_fds[3+as_nr].events=POLLIN|POLLHUP;
00302         poll_fds[3+as_nr].revents=0;
00303         as_nr++;/*not very sure if this is thread-safe*/
00304         unc_as_nr--;
00305           }else if(fd<=0){/* pull the upper set of uncomplete AS down and take this one out*/
00306         poll_tmp->revents=0;
00307         for(k=i;k<(unc_as_nr-1);k++){
00308            j=3+as_nr+k;
00309            poll_fds[j].fd=poll_fds[j+1].fd;
00310            poll_fds[j].events=poll_fds[j+1].events;
00311            poll_fds[j].revents=poll_fds[j+1].revents;
00312         }
00313         unc_as_nr--;
00314         /** we decrement i so that pulling down the upper part of the unc_as array so that
00315          * it doesn't affect our for loop */
00316         i--;
00317           }
00318        }
00319        if(poll_tmp->revents & POLLHUP){
00320           LM_DBG("POLLHUP found in %d uncomplete AS \n",i);
00321           close(poll_tmp->fd);
00322           for(k=i;k<(unc_as_nr-1);k++){
00323         j=3+as_nr+k;
00324         poll_fds[j].fd=poll_fds[j+1].fd;
00325         poll_fds[j].events=poll_fds[j+1].events;
00326         poll_fds[j].revents=poll_fds[j+1].revents;
00327           }
00328           unc_as_nr--;
00329           i--;
00330           poll_tmp->revents = 0;
00331        }
00332     }/*for*/
00333       }/*else ...(poll_events>0)*/
00334    }/*while(1)*/
00335 }
00336 
00337 
00338 /**
00339  * opens the server socket, which attends (accepts) the clients, that is: 
00340  * params:
00341  * address:
00342  *    address to which to listen
00343  * port:
00344  *    base port to which to listen. then port+1 will be the socket
00345  *    for action's delivery.
00346  * fds:
00347  *    in fd[0] the action socket will be put.
00348  *    in fd[1] the event socket will be put.
00349  *
00350  * returns 0 on exit, <0 on fail
00351  *
00352  */
00353 static int open_server_sockets(struct ip_addr *address,unsigned short port,int *fd)
00354 {
00355 
00356    /*using sockaddr_union enables ipv6..*/
00357    union sockaddr_union su;
00358    int i,optval;
00359 
00360    fd[0]=fd[1]=-1;
00361 
00362    if(address->af!=AF_INET && address->af!=AF_INET6){
00363       LM_ERR("Only ip and ipv6 allowed socket types\n");
00364       return -1;
00365    }
00366 
00367    for(i=0;i<2;i++){
00368       if(init_su(&su,address,port+i)<0){
00369     LM_ERR("unable to init sockaddr_union\n");
00370     return -1;
00371       }
00372       if((fd[i]=socket(AF2PF(su.s.sa_family), SOCK_STREAM, 0))==-1){
00373     LM_ERR("trying to open server %s socket (%s)\n",i==0?"event":"action",strerror(errno));
00374     goto error;
00375       }
00376       optval=1;
00377       if (setsockopt(fd[i], SOL_SOCKET, SO_REUSEADDR, (void*)&optval, sizeof(optval))==-1) {
00378     LM_ERR("setsockopt (%s)\n",strerror(errno));
00379     goto error;
00380       }
00381       if ((bind(fd[i], &su.s,sizeof(union sockaddr_union)))==-1){
00382     LM_ERR( "bind (%s)\n",strerror(errno));
00383     goto error;
00384       }
00385       if (listen(fd[i], 10)==-1){
00386     LM_ERR( "listen (%s)\n",strerror(errno));
00387     goto error;
00388       }
00389    }
00390    return 0;
00391 
00392 error:
00393    for(i=0;i<2;i++)
00394       if(fd[i]!=-1){
00395     close(fd[i]);
00396     fd[i]=-1;
00397       }
00398    return -1;
00399 }
00400 
00401 
00402 union helper{
00403    as_msg_p ptr;
00404    char bytes[sizeof(as_msg_p)];
00405 };
00406 
00407 /**
00408  * Sends event 
00409  *
00410  * returns
00411  *     0  OK
00412  *    -1 couldn't read the event from the pipe
00413  *    -2 couldn't send the event
00414  * TODO this should be FAR more generic... for example, there might be events
00415  * which are not related to any transaction (finish event, or error event...)
00416  * we should separate event-specific handling in different functions...
00417  */
00418 static int dispatch_relay(void)
00419 {
00420    int i,j,retval,tries;
00421    union helper thepointer;
00422 
00423    i=j=0;
00424    retval=0;
00425 read_again:
00426    i=read(read_pipe,thepointer.bytes+j,sizeof(as_msg_p)-j);
00427    if(i<0){
00428       if(errno==EINTR){
00429     goto read_again;
00430       }else{
00431     LM_ERR("Dispatcher Process received unknown error"
00432           " reading from pipe (%s)\n",strerror(errno));
00433     retval=-1;
00434     goto error;
00435       }
00436    }else if(i==0){
00437     LM_ERR("Dispatcher Process "
00438           "received 0 while reading from pipe\n");
00439     goto error;
00440    }else{
00441       j+=i;
00442       if(j<sizeof(as_msg_p))
00443     goto read_again;
00444    }
00445 
00446    if (!thepointer.ptr) {
00447       LM_ERR("Received Corrupted pointer to event !!\n");
00448       retval=0;
00449       goto error;
00450    }
00451    /*the message*/
00452    if(use_stats && thepointer.ptr->transaction)
00453       event_stat(thepointer.ptr->transaction);
00454    if(thepointer.ptr->as == NULL || !thepointer.ptr->as->connected || thepointer.ptr->as->type==CLUSTER_TYPE){
00455       LM_WARN("tryied to send an event to an App Server"
00456            " that is scheduled to die!!\n");
00457       retval=-2;
00458       goto error;
00459    }
00460    j=0;
00461    tries=0;
00462 write_again:
00463    i=write(thepointer.ptr->as->u.as.event_fd,thepointer.ptr->msg+j,thepointer.ptr->len-j);
00464    if(i==-1){
00465       switch(errno){
00466     case EINTR:
00467        if(!thepointer.ptr->as->connected){
00468           LM_WARN("tryied to send an event to an App Server"
00469                " that is scheduled to die!!\n");
00470           retval=-2;
00471           goto error;
00472        }
00473        goto write_again;
00474     case EPIPE:
00475        LM_ERR("AS [%.*s] closed "
00476         "the socket !\n",thepointer.ptr->as->u.as.name.len,thepointer.ptr->as->u.as.name.s);
00477        retval=-2;
00478        goto error;
00479     default:
00480        LM_ERR("unknown error while trying to write to AS socket(%s)\n",
00481             strerror(errno));
00482        retval=-2;
00483        goto error;
00484       }
00485    }else if(i>0){
00486       j+=i;
00487       if(j<thepointer.ptr->len)
00488     goto write_again;
00489    }else if(i==0){
00490       if (tries++ > MAX_WRITE_TRIES) { 
00491     LM_ERR("MAX WRITE TRIES !!!\n");
00492     goto error;
00493       }else
00494     goto write_again;
00495    }
00496    LM_DBG("Event relaied to %.*s AS\n",thepointer.ptr->as->u.as.name.len,
00497          thepointer.ptr->as->u.as.name.s);
00498    LM_DBG("Event type %s \n",action_names[thepointer.ptr->type]);
00499    retval=0;
00500 error:
00501    if(thepointer.ptr){
00502       if(thepointer.ptr->msg)
00503     shm_free(thepointer.ptr->msg);
00504       shm_free(thepointer.ptr);
00505    }
00506    return retval;
00507 }
00508 
00509 /**
00510  * receives 2 indexes in unc_as_t which correspond one to
00511  * the events socket and the other to the actions socket
00512  *
00513  * returns
00514  *    0 on success
00515  *    -1 on error
00516  */
00517 static inline int add_new_as(int event_idx,int action_idx,struct as_entry *as)
00518 {
00519    struct unc_as *ev,*ac;
00520    int j;
00521    as_p the_as=0;
00522    struct as_entry *tmp;
00523 
00524    ev=&unc_as_t[event_idx];
00525    ac=&unc_as_t[action_idx];
00526 
00527    the_as=&(as->u.as);
00528 
00529    the_as->action_fd=ac->fd;
00530    the_as->event_fd=ev->fd;
00531    the_as->name.len = strlen(ev->name);
00532    if(use_ha){
00533       if(jain_ping_timeout){
00534     if (0>init_pingtable(&the_as->jain_pings,jain_ping_timeout,(jain_ping_timeout/jain_ping_period+1)*PING_OVER_FACTOR)){
00535        LM_ERR("Unable to init jain pinging table...\n");
00536        goto error;
00537     }
00538       }
00539       if(servlet_ping_timeout){
00540     if (0>init_pingtable(&the_as->servlet_pings,servlet_ping_timeout,(servlet_ping_timeout/servlet_ping_period+1)*PING_OVER_FACTOR)){
00541        LM_ERR("Unable to init servlet pinging table...\n");
00542        goto error;
00543     }
00544       }
00545    }
00546    /*TODO attention, this is pkg_malloc because only the Event_Dispatcher process 
00547     * has to use it !!*/
00548    if(!(the_as->ev_buffer.s = pkg_malloc(AS_BUF_SIZE))){
00549       LM_ERR("unable to alloc pkg mem for the event buffer\n");
00550       goto error;
00551    }
00552    the_as->ev_buffer.len=0;
00553    as->connected=1;
00554    the_as->action_pid=0;
00555    for(tmp=as_list;tmp;tmp=tmp->next){
00556       if(tmp->type==AS_TYPE)
00557     continue;
00558       for (j=0;j<tmp->u.cs.num;j++) {
00559     if (tmp->u.cs.as_names[j].len == the_as->name.len && 
00560           !memcmp(tmp->u.cs.as_names[j].s,the_as->name.s,the_as->name.len)) { 
00561        if(tmp->u.cs.num==tmp->u.cs.registered){
00562           LM_ERR("AS %.*s belongs to cluster %.*s which is already completed\n",
00563            the_as->name.len,the_as->name.s,tmp->name.len,tmp->name.s);
00564           break;
00565        }
00566        tmp->u.cs.registered++;
00567        break;
00568     }
00569       }
00570    }
00571    if(0>spawn_action_dispatcher(as)){
00572       LM_ERR("Unable to spawn Action Dispatcher for as %s\n",ev->name);
00573       goto error;
00574    }
00575    if(send_sockinfo(the_as->event_fd)==-1){
00576       LM_ERR("Unable to send socket info to as %s\n",ev->name);
00577       goto error;
00578    }
00579    return 0;
00580 error:
00581    if(the_as->ev_buffer.s){
00582       pkg_free(the_as->ev_buffer.s);
00583       the_as->ev_buffer.s=(char*)0;
00584    }
00585    if(the_as->action_pid)
00586       kill(the_as->action_pid,SIGTERM);
00587    if(jain_ping_timeout)
00588       destroy_pingtable(&the_as->jain_pings);
00589    if(servlet_ping_timeout)
00590       destroy_pingtable(&the_as->servlet_pings);
00591    return -1;
00592 }
00593 
00594 
00595 
00596 
00597 
00598 
00599 /**prints available sockets in SER to the App Server.
00600  * format is:
00601  * 1: transport identifier (u for UDP, t for TCP, s for TLS)
00602  * 1: length of socket name (sip.voztele.com or whatever)
00603  * N: name
00604  * 1: length of IP address (192.168.1.2)
00605  * N: ip address in ascii
00606  * 2: port nubmer in NBO
00607  *
00608  * returns
00609  *    -1 on error
00610  *    0  on success
00611  */
00612 static inline int send_sockinfo(int fd)
00613 {
00614    struct socket_info *s;
00615    unsigned char i;
00616    char buffer[300];
00617    int k=0,j;
00618    buffer[k++]=16;/*This used to be T_TABLE_POWER in openser 1.0.1, now its hardcoded in config.h*/
00619    for(i=0,s=udp_listen;s;s=s->next,i++);
00620 #ifdef USE_TCP
00621    for(s=tcp_listen;s;s=s->next,i++);
00622 #endif
00623 #ifdef USE_TLS
00624    for(s=tls_listen;s;s=s->next,i++);
00625 #endif
00626    if(i==0){
00627       LM_ERR("no udp|tcp|tls sockets ?!!\n");
00628       return -1;
00629    }
00630    buffer[k++]=i;
00631    for(s=udp_listen;s;s=s->next){
00632       if(print_sock_info(buffer,300,&k,s,PROTO_UDP)==-1)
00633     return -1;
00634    }
00635 #ifdef USE_TCP
00636    for(s=tcp_listen;s;s=s->next){
00637       if(print_sock_info(buffer,300,&k,s,PROTO_TCP)==-1)
00638     return -1;
00639    }
00640 #endif
00641 #ifdef USE_TLS
00642    for(s=tls_listen;s;s=s->next){
00643       if(print_sock_info(buffer,300,&k,s,PROTO_TLS)==-1)
00644     return -1;
00645    }
00646 #endif
00647 write_again:
00648    j=write(fd,buffer,k);
00649    if(j==-1){
00650       if(errno==EINTR)
00651     goto write_again;
00652       else
00653     return -1;
00654    }
00655    return 0;
00656 }
00657 
00658 /* prints sock info into the byte array where
00659  * returns 0 on success, -1 on err
00660  * the message sent is as follows:
00661  * 1: protocol type (0=NONE,1=UDP, 2=TCP, 3=TLS)
00662  * 1: name length
00663  * N: name
00664  * 1: address string length
00665  * N: address
00666  * 2: NBO unsigned shor int port number
00667  *
00668  * TODO buffer overflow risk
00669  */
00670 static inline int print_sock_info(char *buffer,int wheremax,int *idx,struct socket_info *s,enum sip_protos type)
00671 {
00672    int k;
00673    unsigned char i;
00674    unsigned short int j;
00675    if((wheremax-*idx)<49)/*31*name+17*ipv6+2*port+1*type*/
00676       return -1;
00677    k=*idx;
00678    buffer[k++]=(char)type;
00679    if((i=(unsigned char)s->name.len)>30){
00680       LM_ERR("name too long\n");
00681       return -1;
00682    }
00683    buffer[k++]=i;
00684    memcpy(&buffer[k],s->name.s,i);
00685    k+=i;
00686    i=(unsigned char)s->address_str.len;
00687    buffer[k++]=i;
00688    memcpy(&buffer[k],s->address_str.s,i);
00689    k+=i;
00690    j=htons(s->port_no);
00691    memcpy(&buffer[k],&j,2);
00692    k+=2;
00693    *idx=k;
00694    return 0;
00695 }
00696 
00697 /**
00698  * Handles data from an AppServer. First searches in the AS table which was the AS
00699  * that sent the data (we dont already know it because this comes from a poll_fd
00700  * struct). When the one is found, it calls process_event_reply, which in turn
00701  * looks if there's a complete event in the buffer, and if there is, processes it.
00702  *
00703  * returns
00704  *    -1 on error
00705  *    -2 on read()==0 (the socket has been closed by the other end)
00706  *    0 on success
00707  */
00708 static int handle_as_data(int fd)
00709 {
00710    int j,k;
00711    struct as_entry *as;
00712    for(as=as_list;as;as=as->next)
00713       if(as->type == AS_TYPE && as->connected && (as->u.as.event_fd==fd))
00714     break;
00715    if(!as){
00716       LM_ERR("AS not found\n");
00717       return -1;
00718    }
00719    k=AS_BUF_SIZE-(as->u.as.ev_buffer.len);
00720 again:
00721    if((j=read(fd,as->u.as.ev_buffer.s+as->u.as.ev_buffer.len,k))<0){
00722       LM_ERR("reading data for as %.*s\n",as->name.len,as->name.s);
00723       if(errno==EINTR)
00724     goto again;
00725       else
00726     return -1;
00727    }else if(j==0){
00728       LM_ERR("AS client leaving (%.*s)\n",as->name.len,as->name.s);
00729       return -2;
00730    }
00731    as->u.as.ev_buffer.len+=j;
00732    LM_DBG("read %d bytes from AS (total = %d)\n",j,as->u.as.ev_buffer.len);
00733    if(as->u.as.ev_buffer.len>5)
00734       process_event_reply(&as->u.as);
00735    return 0;
00736 }
00737 
00738 /**
00739  * This function processess the Application Server buffer. We do buffered
00740  * processing because it increases performance quite a bit. Any message
00741  * sent from the AS comes with the first 2 bytes as an NBO unsigned short int 
00742  * which says the length of the following message (header and payload).
00743  * This way, we avoid multiple small reads() to the socket, which (as we know), consumes
00744  * far more processor because of the kernel read(2) system call. The drawback
00745  * is the added complexity of mantaining a buffer, the bytes read, and looking
00746  * if there is a complete message already prepared.
00747  *
00748  * Actions are supposed to be small, that's why BUF_SIZE is 2000 bytes length. 
00749  * Most of the actions will be that size or less. That is why the 4 bytes telling the
00750  * length of the Action payload are included in its size. This way you can use a fixed size
00751  * buffer to receive the Actions and not need to be pkb_malloc'ing for each new event.
00752  * If there is a particular bigger packet, for example one carrying a picture (a JPG can
00753  * easily surpass the 2000 byte limit) then a pkg_malloc will be required. This is left TODO
00754  *
00755  * returns
00756  *    -1 on error (packet too big)
00757  *    0  on success
00758  */
00759 static int process_event_reply(as_p as)
00760 {
00761    unsigned int ev_len;
00762    ev_len=(as->ev_buffer.s[0]<<24)|(as->ev_buffer.s[1]<<16)|(as->ev_buffer.s[2]<<8)|(as->ev_buffer.s[3]);/*yeah, it comes in network byte order*/
00763    /*if ev_len > BUF_SIZE then a flag should be put on the AS so that the whole length
00764     * of the action is skipped, until a mechanism for handling big packets is implemented*/
00765    if(ev_len>AS_BUF_SIZE){
00766       LM_WARN("Packet too big (%d)!!! should be skipped"
00767            " and an error returned!\n",ev_len);
00768       return -1;
00769    }
00770    if((as->ev_buffer.len<ev_len) || as->ev_buffer.len<4)
00771       return 0;
00772    switch(as->ev_buffer.s[4]){
00773       case BIND_AC:
00774     LM_DBG("Processing a BIND action from AS (length=%d): %.*s\n",
00775     ev_len,as->name.len,as->name.s);
00776     process_bind_action(as,&as->ev_buffer.s[5],ev_len-5);
00777     break;
00778       case UNBIND_AC:
00779     LM_DBG("Processing a UNBIND action from AS (length=%d): %.*s\n",
00780           ev_len,as->name.len,as->name.s);
00781     process_unbind_action(as,&as->ev_buffer.s[5],ev_len-5);
00782     break;
00783       default:
00784     return 0;
00785    }
00786    memmove(as->ev_buffer.s,&(as->ev_buffer.s[ev_len]),(as->ev_buffer.len)-ev_len);
00787    (as->ev_buffer.len)-=ev_len;
00788    return 0;
00789 }
00790 
00791 
00792 /**
00793  * processes a BIND event type from the AS.
00794  * Bind events follow this form:
00795  * 4:flags
00796  * 1:processor_id
00797  * 1:Address Family
00798  * 1:address length in bytes (16 for ipv6, 4 for ipv4) in NETWORK BYTE ORDER (fortunately, ip_addr struct stores it in NBO)
00799  * [16|4]:the IP address
00800  * 1:protocol used (UDP,TCP or TLS);
00801  * 2:NBO port
00802  *
00803  */
00804 int process_bind_action(as_p as,char *payload,int len)
00805 {
00806    struct socket_info *si,*xxx_listen;
00807    struct ip_addr my_addr;
00808    int i,k,proto;
00809    unsigned int flags;
00810    unsigned short port;
00811    char processor_id,buffer[300],*proto_s;
00812    k=0;
00813    *buffer=0;
00814    proto_s="NONE";
00815    net2hostL(flags,payload,k);
00816    processor_id=payload[k++];
00817    for(i=0;i<MAX_BINDS;i++){
00818       if(as->bound_processor[i]==0)
00819     break;
00820    }
00821    if(i==MAX_BINDS){
00822       LM_ERR("No more bindings allowed. Ignoring bind request for processor %d\n",processor_id);
00823       return -1;
00824    }
00825    memset(&my_addr,0,sizeof(struct ip_addr));
00826    my_addr.af=payload[k++];
00827    my_addr.len=payload[k++];
00828    memcpy(my_addr.u.addr,payload+k,my_addr.len);
00829    k+=my_addr.len;
00830    proto=payload[k++];
00831    memcpy(&port,payload+k,2);
00832    k+=2;
00833    port=ntohs(port);
00834    print_ip_buf(&my_addr,buffer,300);
00835    switch(proto){
00836       case PROTO_UDP:
00837     proto_s="UDP";
00838     xxx_listen=udp_listen;
00839     break;
00840 #ifdef USE_TCP
00841       case PROTO_TCP:
00842     proto_s="TCP";
00843     xxx_listen=tcp_listen;
00844     break;
00845 #endif
00846 #ifdef USE_TLS
00847       case PROTO_TLS:
00848     proto_s="TLS";
00849     xxx_listen=tls_listen;
00850     break;
00851 #endif
00852       default:
00853     goto error;
00854    }
00855    for(si=xxx_listen;si;si=si->next){
00856       if(my_addr.af==si->address.af && 
00857        my_addr.len==si->address.len && 
00858        !memcmp(si->address.u.addr,my_addr.u.addr,my_addr.len) && 
00859        port == si->port_no){
00860     as->binds[i]=si;
00861     as->bound_processor[i]=processor_id;
00862     as->num_binds++;
00863     LM_DBG("AS processor with id: %d bound to %s %s %d\n",processor_id,proto_s,buffer,port);
00864     return 0;
00865       }
00866    }
00867 error:
00868    LM_ERR("Cannot bind to %s %s %d !!!\n",proto_s,buffer,port);
00869    return -1;
00870 }
00871 
00872 /**
00873  * processes a UNBIND event type from the AS.
00874  * Bind events follow this form:
00875  * 1:processor_id
00876  *
00877  */
00878 int process_unbind_action(as_p as,char *payload,int len)
00879 {
00880    int i,k;
00881    unsigned int flags;
00882    char processor_id;
00883    k=0;
00884    net2hostL(flags,payload,k);
00885    processor_id=payload[k++];
00886    for(i=0;i<as->num_binds;i++){
00887       if(as->bound_processor[i] == processor_id)
00888     break;
00889    }
00890    if(i==MAX_BINDS){
00891       LM_ERR("tried to unbind a processor which is not registered (id=%d)!\n",processor_id);
00892       return 0;
00893    }
00894    as->bound_processor[i]=0;
00895    as->num_binds--;
00896    LM_DBG("AS processor un-bound with id: %d\n",processor_id);
00897    return 0;
00898 }
00899 
00900 /**
00901  * params:
00902  *   the filedes where the data was received.
00903  * returns:
00904  *   0 if this fd should be taken out of the poll_fd array bcause it already has AS name.
00905  *   fd if an AS was completed (returns the fd of the events socket)
00906  *   -1 if there was an error
00907  *   -2 if client disconnected and should be closed and taken outside the poll_fd array
00908  */
00909 static int handle_unc_as_data(int fd)
00910 {
00911    int i,j,k,len;
00912    char *name1;
00913    struct as_entry *as;
00914    /*first, we see if the data to read is from any of the uncompleted as's*/
00915    for(i=0;i<2*MAX_UNC_AS_NR ;i++)
00916       if(unc_as_t[i].valid && unc_as_t[i].fd==fd)
00917     break;
00918    if(i==2*MAX_UNC_AS_NR){
00919       LM_ERR("has received an fd which is not in uncompleted AS array\n");
00920       return -1;
00921    }
00922    if(unc_as_t[i].flags & HAS_NAME){/*shouldn't happen, if it has a name, it shouldnt be in fdset[]*/
00923       LM_WARN("this shouldn't happen\n");
00924       return 0;/*already have a name, please take me out the uncompleted AS array*/
00925    }
00926    LM_DBG("Reading client name\n");
00927 
00928    if(-1==(len=read_name(fd,unc_as_t[i].name,MAX_AS_NAME))){
00929       /*this guy should be disconnected, it sent an AS_NAME too long*/
00930       LM_ERR("Bad name passed from fd\n");
00931       unc_as_t[i].valid=0;
00932       unc_as_t[i].flags=0;
00933       return -2;
00934    }else if(len==-2){
00935       LM_WARN("client disconnected\n");
00936       return -2;
00937    }
00938    name1=unc_as_t[i].name;
00939 
00940    /* Check the name isn't already taken */
00941    for(as=as_list;as;as=as->next){
00942       if(as->name.len==len && !memcmp(name1,as->name.s,len)){
00943     if(as->connected){
00944        LM_WARN("AppServer trying to connect with a name already taken (%.*s)\n",len,name1);
00945        unc_as_t[i].valid=0;
00946        unc_as_t[i].flags=0;
00947        return -2;
00948     }
00949     break;
00950       }
00951    }
00952    if (!as) {
00953       LM_ERR("a client tried to connect which is not declared in config. script(%.*s)\n",len,name1);
00954       unc_as_t[i].valid=0;
00955       unc_as_t[i].flags=0;
00956       return -2;
00957    }
00958    unc_as_t[i].flags |= HAS_NAME;
00959    /* the loop's upper bound, 
00960     * if 'i' is in the lower part, then look for an unc_as in the upper part*/
00961    k=(i>=MAX_UNC_AS_NR?MAX_UNC_AS_NR:2*MAX_UNC_AS_NR);
00962    /* the loop's lower bound */
00963    for(j=(i>=MAX_UNC_AS_NR?0:MAX_UNC_AS_NR);j<k;j++)
00964       if(unc_as_t[j].valid && 
00965        (unc_as_t[j].flags & HAS_NAME) && 
00966        !strcmp(unc_as_t[i].name,unc_as_t[j].name))
00967     break;
00968    LM_INFO("Fantastic, we have a new client: %s\n",unc_as_t[i].name);
00969    if(j==k)/* the unc_as peer's socket hasn't been found, just take this one out of fdset because it already has its name */
00970       return 0;/*take me out from fdset[]*/
00971    LM_INFO("EUREKA, we have a new completed AS: %s\n",unc_as_t[i].name);
00972    /* EUREKA ! we have a sweet pair of AS sockets, with the same name !!*/
00973    if(add_new_as(i<j?i:j,i<j?j:i,as)==-1){
00974       close(unc_as_t[j].fd);
00975       close(unc_as_t[i].fd);
00976       unc_as_t[j].valid=unc_as_t[i].valid=0;
00977       unc_as_t[j].flags=unc_as_t[i].flags=0;
00978       return -1;
00979    }
00980    unc_as_t[j].valid=unc_as_t[i].valid=0;
00981    unc_as_t[j].flags=unc_as_t[i].flags=0;
00982    return unc_as_t[i<j?i:j].fd;
00983 }
00984 
00985 
00986 /* This bloated function reads the name of the AS from the
00987  * socket and copies it into dst, then returns strlen(dst)
00988  * or -1 if error.
00989  */
00990 static inline int read_name(int sock,char *dst,int dstlen)
00991 {
00992    int n,namelen;
00993    namelen=0;
00994 try_again1:
00995    if((n=read(sock,&namelen,1))<0){
00996       if(errno==EINTR)
00997     goto try_again1;
00998       else{
00999     LM_ERR("trying to read length from fd=%d (%s)\n",sock,strerror(errno));
01000     return -1;
01001       }
01002    }else if(n==0){
01003       LM_WARN("uncomplete AS has disconnected before giving its name\n");
01004       return -2;
01005    }
01006    if(namelen>dstlen || namelen==0){
01007       LM_ERR("name too long to fit in dst (%d > %d)\n",namelen,dstlen);
01008       return -1;
01009    }
01010 try_again2:
01011    if((n=read(sock,dst,namelen))<0){
01012       if(errno==EINTR)
01013     goto try_again2;
01014       else{
01015     LM_ERR("trying to read %d chars into %p from fd=%d (%s)\n",namelen,dst,sock,strerror(errno));
01016     return -1;
01017       }
01018    }else if(n==0){
01019       LM_WARN("uncomplete AS has disconnected before giving its name\n");
01020       return -2;
01021    }
01022    dst[namelen]=0;
01023    return namelen;
01024 }
01025 
01026 /* handle new App Server connect. 
01027  * params:
01028  * fd:
01029  *    fd on which to accept.
01030  * which:
01031  *    if the fd is the event one, which='e', if is action, which='a'
01032  *
01033  * TODO: not very reliable, because if someone connects() to one of the serversockets 
01034  * but not to the other one, then synchronization would be lost, and any subsequent connect
01035  * attempts would fail (remember, we receive a connect in event[] and wait for a connect in action)
01036  * the point is, the connects must allways come in pairs, if one comes alone, we lost sync.
01037  * we should put kind of timeout in connects or something...
01038  */
01039 static int new_as_connect(int fd,char which)
01040 {
01041    union sockaddr_union su;
01042    int sock,i,flags;
01043    socklen_t su_len;
01044 
01045    su_len = sizeof(union sockaddr_union);
01046    sock=-1;
01047 again:
01048    sock=accept(fd, &su.s, &su_len);
01049    if(sock==-1){
01050       if(errno==EINTR){
01051     goto again;
01052       }else{
01053        LM_ERR("while accepting connection: %s\n", strerror(errno));
01054     return -1;
01055       }
01056    }
01057    switch(which){
01058       case 'e':
01059     for(i=0;i<MAX_UNC_AS_NR && unc_as_t[i].valid;i++);
01060     if(i==MAX_UNC_AS_NR){
01061        LM_WARN("no more uncomplete connections allowed\n");
01062        goto error;
01063     }
01064     unc_as_t[i].fd=sock;
01065     unc_as_t[i].valid=1;
01066     unc_as_t[i].flags=HAS_FD;
01067     memcpy(&unc_as_t[i].su,&su,su_len);
01068     break;
01069       case 'a':
01070     for(i=MAX_UNC_AS_NR;(i<(2*MAX_UNC_AS_NR)) && unc_as_t[i].valid;i++);
01071     if(i==2*MAX_UNC_AS_NR){
01072        LM_WARN("no more uncomplete connections allowed\n");
01073        goto error;
01074     }
01075     unc_as_t[i].fd=sock;
01076     unc_as_t[i].valid=1;
01077     unc_as_t[i].flags=HAS_FD;
01078     memcpy(&unc_as_t[i].su,&su,su_len);
01079     break;
01080       default:
01081     break;
01082    }
01083    flags=1;
01084    if ((setsockopt(sock, IPPROTO_TCP , TCP_NODELAY,&flags, sizeof(flags))<0) ){
01085       LM_WARN("could not disable Nagle: %s\n",
01086        strerror(errno));
01087    }
01088 
01089    return sock;
01090 error:
01091    if(sock!=-1)
01092       close(sock);
01093    return -1;
01094 }
01095 
01096 int spawn_action_dispatcher(struct as_entry *the_as)
01097 {
01098    pid_t pid;
01099    pid=fork();
01100    if(pid<0){
01101       LM_ERR("unable to fork an action dispatcher for %.*s\n",the_as->name.len,the_as->name.s);
01102       return -1;
01103    }
01104    if(pid==0){/*child*/
01105       my_as = the_as;
01106       is_dispatcher=0;
01107       dispatch_actions();
01108       exit(0);
01109    }else{
01110       the_as->u.as.action_pid=pid;
01111    }
01112    return 0;
01113 }
01114 

Generated on Wed May 23 06:00:45 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6