seas.c

Go to the documentation of this file.
00001 /* $Id: seas.c 5350 2008-12-14 22:09:26Z klaus_darilion $
00002  *
00003  * Copyright (C) 2006-2007 VozTelecom Sistemas S.L
00004  *
00005  * This file is part of Kamailio, a free SIP server.
00006  *
00007  * Kamailio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 2 of the License, or
00010  * (at your option) any later version
00011  *
00012  * Kamailio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License 
00018  * along with this program; if not, write to the Free Software 
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  */
00021 
00022 #include <string.h>/*memset*/
00023 #include <errno.h>/*errno*/
00024 #include <unistd.h>/*close(),pipe,fork,pid_t*/
00025 #include <sys/wait.h>/*wait*/
00026 #include <signal.h>/*SIGINT,etc*/
00027 
00028 #include "../../sr_module.h"
00029 #include "../../ip_addr.h" /*ip_addr,hostent2ip_addr*/
00030 #include "../../tags.h" /*init_tags*/
00031 #include "../../socket_info.h" /*get_first_socket()*/
00032 #include "../../resolve.h" /*resolvehost*/
00033 #include "../../mem/mem.h" /*pkg_malloc*/
00034 #include "../../mem/shm_mem.h" /*shm_malloc*/
00035 #include "../../dprint.h" /*LM_**/
00036 #include "../../error.h" /*ser_error*/
00037 #include "../tm/tm_load.h" /*load_tm_api*/
00038 #include "../tm/h_table.h" /*cell*/
00039 #include "../tm/t_lookup.h" /*T_UNDEFINED*/
00040 
00041 #include "encode_msg.h" /*encode_msg*/
00042 
00043 #include "seas.h"
00044 #include "seas_action.h"
00045 #include "event_dispatcher.h"
00046 #include "statistics.h"/*pstart_stats_server*/
00047 #include "ha.h"
00048 #include "cluster.h"
00049 
00050 MODULE_VERSION
00051 
00052 
00053 /* Exported Functions */
00054 static int w_as_relay_t(struct sip_msg *msg, char *as_name, char *foo);
00055 static int w_as_relay_sl(struct sip_msg *msg, char *as_name, char *foo);
00056 
00057 /* Local functions */
00058 static int seas_init(void);
00059 static int seas_child_init(int rank);
00060 static int seas_exit();
00061 static int fixup_as_relay(void** param, int param_no);
00062 
00063 /*utility functions*/
00064 static void seas_init_tags();
00065 static inline int is_e2e_ack(struct cell *t,struct sip_msg *msg);
00066 
00067 char seas_tags[TOTAG_VALUE_LEN+1];
00068 char *seas_tag_suffix;
00069 char whoami[MAX_WHOAMI_LEN];
00070 int is_dispatcher=0;
00071 extern int sig_flag;
00072 
00073 static char *seas_listen_socket=0;
00074 static char *seas_stats_socket=0;
00075 
00076 struct ip_addr *seas_listen_ip=0;
00077 unsigned short seas_listen_port=0;
00078 
00079 struct as_entry *as_table=0;
00080 
00081 struct as_entry *as_list=0;
00082 
00083 int write_pipe=0;
00084 int read_pipe=0;
00085 
00086 struct seas_functions seas_f;
00087 
00088 static cmd_export_t cmds[]=
00089 {
00090    {"as_relay_t",   (cmd_function)w_as_relay_t,  1,  fixup_as_relay,
00091          0, REQUEST_ROUTE},
00092    {"as_relay_sl",  (cmd_function)w_as_relay_sl, 1,  fixup_as_relay,
00093          0, REQUEST_ROUTE},
00094    {0,0,0,0,0,0}
00095 };
00096 
00097 static param_export_t params[]=
00098 {
00099    {"listen_sockets",STR_PARAM, &seas_listen_socket},
00100    {"stats_socket",  STR_PARAM, &seas_stats_socket},
00101    {"jain_ping",     STR_PARAM, &jain_ping_config},
00102    {"servlet_ping",  STR_PARAM, &servlet_ping_config},
00103    {"clusters",      STR_PARAM, &cluster_cfg},
00104    {0,0,0}
00105 };
00106 
00107 struct module_exports exports= 
00108 {
00109    "seas",
00110    DEFAULT_DLFLAGS,
00111    cmds,
00112    params,
00113    0,
00114    0,
00115    0,
00116    0,           /* extra processes */
00117    seas_init,   /* module initialization function */
00118    0,
00119    (destroy_function) seas_exit,   /* module exit function */
00120    (child_init_function) seas_child_init  /* per-child init function */
00121 };
00122 
00123 static int fixup_as_relay(void** param, int param_no)
00124 {
00125    int len;
00126    char *parameter;
00127    struct as_entry **entry,*tmp;
00128 
00129    parameter=(char *)(*param);
00130 
00131    if (param_no!=1)
00132       return 0;
00133    len=strlen(parameter);
00134    
00135    for (entry=&as_list;*entry;entry=&((*entry)->next)) {
00136       if (len== (*entry)->name.len && 
00137        !memcmp((*entry)->name.s,parameter,len)) {
00138     pkg_free(*param);
00139     *param=*entry;
00140     return 1;
00141       }
00142    }
00143    if (!(*entry)) {
00144       if (!(*entry=(struct as_entry *)shm_malloc(sizeof(struct as_entry)))) {
00145     LM_ERR("no more shm_mem\n");
00146     goto error;
00147       }
00148       memset(*entry,0,sizeof(struct as_entry));
00149       if(!((*entry)->name.s=shm_malloc(len))){
00150     LM_ERR("no more share mem\n");
00151     goto error;
00152       }
00153       (*entry)->name.len=len;
00154       memcpy((*entry)->name.s,parameter,len);
00155       (*entry)->u.as.name=(*entry)->name;
00156       (*entry)->u.as.event_fd=(*entry)->u.as.action_fd=-1;
00157       (*entry)->type=AS_TYPE;
00158       pkg_free(*param);
00159       *param=*entry;
00160    }
00161    for (tmp=as_list;tmp;tmp=tmp->next)
00162       LM_DBG("%.*s\n",tmp->name.len,tmp->name.s);
00163    return 1;
00164 error:
00165    return -1;
00166 }
00167 
00168 /**
00169  * Sets up signal handlers
00170  */
00171 void seas_sighandler(int signo)
00172 {
00173    struct as_entry *as;
00174    if(is_dispatcher)
00175       sig_flag=signo;
00176    switch(signo){
00177       case SIGPIPE:
00178     if(is_dispatcher)
00179        return;
00180     LM_INFO("%s exiting\n",whoami);
00181     if(my_as->u.as.ac_buffer.s){
00182        pkg_free(my_as->u.as.ac_buffer.s);
00183        my_as->u.as.ac_buffer.s=0;
00184     }
00185     if(my_as->u.as.action_fd!=-1){
00186        close(my_as->u.as.action_fd);
00187        my_as->u.as.action_fd=-1;
00188     }
00189     exit(0);
00190     break;
00191       case SIGCHLD:
00192     LM_INFO("Child stopped or terminated\n");
00193     break;
00194       case SIGUSR1:
00195       case SIGUSR2:
00196     LM_DBG("Memory status (pkg):\n");
00197 #ifdef PKG_MALLOC
00198     pkg_status();
00199 #endif
00200     break;
00201       case SIGINT:
00202       case SIGTERM:
00203     LM_INFO("INFO: signal %d received\n",signo);
00204 #ifdef PKG_MALLOC
00205     pkg_status();
00206 #endif
00207     if(is_dispatcher){
00208        for (as=as_list;as;as=as->next) {
00209           if(as->type==AS_TYPE && as->connected)
00210         kill(as->u.as.action_pid,signo);
00211        }
00212        while(wait(0) > 0);
00213        exit(0);
00214     }else{
00215        LM_INFO("%s exiting\n",whoami);
00216        if(my_as && my_as->u.as.ac_buffer.s)
00217           pkg_free(my_as->u.as.ac_buffer.s);
00218        if(my_as && my_as->u.as.action_fd!=-1)
00219           close(my_as->u.as.action_fd);
00220        exit(0);
00221     }
00222     break;
00223    }
00224 }
00225 
00226 /**
00227  * wrapper for the AS transaction-stateful relay script function.
00228  *
00229  */
00230 static int w_as_relay_t(struct sip_msg *msg, char *entry, char *foo)
00231 {
00232    as_msg_p my_as_ev;
00233    int new_tran,ret=0,len;
00234    char *buffer,processor_id;
00235    struct cell *mycel;
00236    struct as_entry *as;
00237    static str msg100={"Your call is important to us",sizeof("Your call is important to us")-1};
00238    static str msg500={"Server Internal Error!",sizeof("Server Internal Error!")-1};
00239 
00240    buffer=(char*)0;
00241    my_as_ev=(as_msg_p)0;
00242 
00243    /**
00244     * returns <0 on error
00245     * 1 if (new transaction was created) or if (ACK for locally replied 200 with totag) or if (ACK for code>=300)
00246     * 0 if it was a retransmission 
00247     */
00248    new_tran = seas_f.tmb.t_newtran(msg);
00249    if(new_tran<0) {
00250       ret = (ser_error==E_BAD_VIA && reply_to_via) ? 0 : new_tran;
00251       goto done;
00252    }
00253    /*retransmission: script processing should be stopped*/
00254    if (new_tran==0 && !(msg->REQ_METHOD==METHOD_ACK)){
00255       ret = 0;
00256       goto done;
00257    }
00258    /*new transaction created, let's pass it to an APP SERVER*/
00259    if (msg->REQ_METHOD==METHOD_INVITE )
00260    {
00261       LM_DBG("new INVITE\n");
00262       if(!seas_f.tmb.t_reply(msg,100,&msg100)){
00263     LM_DBG("t_reply (100)\n");
00264     goto error;
00265       }
00266    }
00267    as=(struct as_entry *)entry;
00268    if(!as->connected){
00269       LM_ERR("app server %.*s not connected\n",as->name.len,as->name.s);
00270       goto error;
00271    }
00272    if(as->type==AS_TYPE){
00273       if((processor_id=get_processor_id(&msg->rcv,&(as->u.as)))<0){
00274     LM_ERR("no processor found for packet with dst port:%d\n",msg->rcv.dst_port);
00275     goto error;
00276       }
00277    }else if(as->type==CLUSTER_TYPE){
00278       LM_ERR("clustering not fully implemented\n");
00279       return 0;
00280    }else{
00281       LM_ERR("unknown type of as (neither cluster nor as)\n");
00282       return -1;
00283    }
00284    LM_DBG("as found ! (%.*s) processor id = %d\n",as->name.len,as->name.s,processor_id);
00285    if(new_tran==1 && msg->REQ_METHOD==METHOD_ACK){
00286       /* core should forward statelessly (says t_newtran)*/
00287       LM_DBG("forwarding statelessly !!!\n");
00288       if(!(buffer=create_as_event_sl(msg,processor_id,&len,0))){
00289     LM_ERR("create_as_event_sl() unable to create event code\n");
00290     goto error;
00291       }
00292    }else if(!(buffer=create_as_event_t(seas_f.tmb.t_gett(),msg,processor_id,&len,0))){
00293       LM_ERR("unable to create event code\n");
00294       goto error;
00295    }
00296    if(!(my_as_ev=shm_malloc(sizeof(as_msg_t)))){
00297       LM_ERR("Out of shared mem!\n");
00298       goto error;
00299    }
00300    my_as_ev->msg = buffer;
00301    my_as_ev->as = as;
00302    my_as_ev->type = T_REQ_IN;
00303    my_as_ev->len = len;
00304    my_as_ev->transaction=seas_f.tmb.t_gett(); /*does not refcount*/
00305    if(use_stats && new_tran>0)
00306       as_relay_stat(seas_f.tmb.t_gett());
00307 again:
00308    ret=write(write_pipe,&my_as_ev,sizeof(as_msg_p));
00309    if(ret==-1){
00310       if(errno==EINTR)
00311     goto again;
00312       else if(errno==EPIPE){
00313     LM_ERR("SEAS Event Dispatcher has closed the pipe. Invalidating it !\n");
00314     goto error;
00315     /** TODO handle this correctly !!!*/
00316       }
00317    }
00318    seas_f.tmb.t_setkr(REQ_FWDED);
00319    ret=0;
00320 done:
00321    return ret;
00322 error:
00323    mycel=seas_f.tmb.t_gett();
00324    if(mycel && mycel!=T_UNDEFINED){
00325       if(!seas_f.tmb.t_reply(msg,500,&msg500)){
00326     LM_ERR("t_reply (500)\n");
00327       }
00328    }
00329    if(my_as_ev)
00330       shm_free(my_as_ev);
00331    if(buffer)
00332       shm_free(buffer);
00333    return ret;
00334 }
00335 
00336 
00337 /**
00338  * wrapper for the AS stateless relay script function.
00339  *
00340  */
00341 static int w_as_relay_sl(struct sip_msg *msg, char *as_name, char *foo)
00342 {
00343    as_msg_p my_as_ev=0;
00344    int ret=0,len;
00345    char *buffer=0,processor_id;
00346    struct as_entry *as;
00347 
00348    as=(struct as_entry *)as_name;
00349 
00350    if(as->type==AS_TYPE){
00351       if((processor_id=get_processor_id(&msg->rcv,&(as->u.as)))<0){
00352     LM_ERR("no processor found for packet with dst port:%d\n",msg->rcv.dst_port);
00353     goto error;
00354       }
00355    }else if (as->type==CLUSTER_TYPE) {
00356       LM_ERR("clustering not fully implemented\n");
00357       goto error;
00358    }else{
00359       LM_ERR("unknown type of as\n");
00360       goto error;
00361    }
00362 
00363    LM_DBG("as found ! (%.*s) processor id = %d\n",as->name.len,as->name.s,processor_id);
00364    if(!(buffer=create_as_event_sl(msg,processor_id,&len,0))){
00365       LM_ERR("unable to create event code\n");
00366       goto error;
00367    }
00368    if(!(my_as_ev=shm_malloc(sizeof(as_msg_t))))
00369       goto error;
00370    my_as_ev->msg = buffer;
00371    my_as_ev->as = as;
00372    my_as_ev->type = SL_REQ_IN;
00373    my_as_ev->len = len;
00374    my_as_ev->transaction=seas_f.tmb.t_gett(); /*does not refcount*/
00375    if(use_stats)
00376       as_relay_stat(seas_f.tmb.t_gett());
00377 again:
00378    ret=write(write_pipe,&my_as_ev,sizeof(as_msg_p));
00379    if(ret==-1){
00380       if(errno==EINTR)
00381     goto again;
00382       else if(errno==EPIPE){
00383     LM_ERR("SEAS Event Dispatcher has closed the pipe. Invalidating it !\n");
00384     return -2;
00385     /** TODO handle this correctly !!!*/
00386       }
00387    }
00388    //this shouln't be here, because it will remove the transaction from memory, but
00389    //if transaction isn't unref'ed iw will be released anyway at t_unref if kr (killreason)==0
00390    // a wait timer will be put to run with WT_TIME_OUT (5 seconds, within which the AS should respond)      
00391    // this is a bug !!! I think this is why we lose calls at high load !!
00392    //t_release(msg, 0, 0);
00393    seas_f.tmb.t_setkr(REQ_FWDED);
00394 
00395    ret=0;
00396    return ret;
00397 error:
00398    if(my_as_ev)
00399       shm_free(my_as_ev);
00400    if(buffer)
00401       shm_free(buffer);
00402    return ret;
00403 }
00404 
00405 /**
00406  * creates an as_event in shared memory and returns its address or NULL if error.
00407  * event_length(4) UNSIGNED INT includes the length 4 bytes itself
00408  * type(1), 
00409  * flags(4),
00410  * transport(1).
00411  * src_ip_len(1), 
00412  * src_ip(4 or 16), 
00413  * dst_ip_len(1), 
00414  * dst_ip(4 or 16), 
00415  * src_port(2), 
00416  * dst_port(2), 
00417  * hash index(4), 
00418  * label(4), 
00419  * [cancelled hash_index,label]
00420  *
00421  */
00422 char * create_as_event_t(struct cell *t,struct sip_msg *msg,char processor_id,int *evt_len,int flags)
00423 {
00424    unsigned int i,hash_index,label;
00425    unsigned short int port;
00426    unsigned int k,len;
00427    char *buffer=NULL;
00428    struct cell *originalT;
00429 
00430    originalT=0;
00431 
00432    if(!(buffer=shm_malloc(ENCODED_MSG_SIZE))){
00433       LM_ERR("Out Of Memory !!\n");
00434       return 0;
00435    }
00436    *evt_len=0;
00437    if(t){
00438       hash_index=t->hash_index;
00439       label=t->label;
00440    }else{
00441       /**seas_f.tmb.t_get_trans_ident(msg,&hash_index,&label); this is bad, because it ref-counts !!!*/
00442       LM_ERR("no transaction provided...\n");
00443       goto error;
00444    }
00445 
00446    k=4;
00447    /*type*/
00448    buffer[k++]=(unsigned char)T_REQ_IN;
00449    /*processor_id*/
00450    buffer[k++]=(unsigned char)processor_id;
00451    /*flags*/
00452    if(is_e2e_ack(t,msg)){
00453       flags|=E2E_ACK;
00454    }else if(msg->REQ_METHOD==METHOD_CANCEL){
00455       LM_DBG("new CANCEL\n");
00456       originalT=seas_f.tmb.t_lookup_original_t(msg);
00457       if(!originalT || originalT==T_UNDEFINED){
00458     /** we dont even pass the unknown CANCEL to JAIN*/
00459     LM_WARN("CANCEL does not match any existing transaction!!\n");
00460     goto error;
00461       }else{
00462     flags|=CANCEL_FOUND;
00463     //seas_f.tmb.unref_cell(originalT);
00464       }
00465       LM_DBG("Cancelling transaction !!\n");
00466    }
00467    flags=htonl(flags);
00468    memcpy(buffer+k,&flags,4);
00469    k+=4;
00470    /*protocol should be UDP,TCP,TLS or whatever*/
00471    buffer[k++]=(unsigned char)msg->rcv.proto;
00472    /*src ip len + src ip*/
00473    len=msg->rcv.src_ip.len;
00474    buffer[k++]=(unsigned char)len;
00475    memcpy(buffer+k,&(msg->rcv.src_ip.u),len);
00476    k+=len;
00477    /*dst ip len + dst ip*/
00478    len=msg->rcv.dst_ip.len;
00479    buffer[k++]=(unsigned char)len;
00480    memcpy(buffer+k,&(msg->rcv.dst_ip.u),len);
00481    k+=len;
00482    /*src port */
00483    port=htons(msg->rcv.src_port);
00484    memcpy(buffer+k,&port,2);
00485    k+=2;
00486    /*dst port */
00487    port=htons(msg->rcv.dst_port);
00488    memcpy(buffer+k,&port,2);
00489    k+=2;
00490    /*hash_index*/
00491    i=htonl(hash_index);
00492    memcpy(buffer+k,&i,4);
00493    k+=4;
00494    /*label (is the collision slot in the hash-table)*/
00495    i=htonl(label);
00496    memcpy(buffer+k,&i,4);
00497    k+=4;
00498    if(msg->REQ_METHOD==METHOD_CANCEL && originalT){
00499       LM_DBG("Cancelled transaction: Hash_Index=%d, Label=%d\n",originalT->hash_index,originalT->label);
00500       /*hash_index*/
00501       i=htonl(originalT->hash_index);
00502       memcpy(buffer+k,&i,4);
00503       k+=4;
00504       /*label (is the collision slot in the hash-table)*/
00505       i=htonl(originalT->label);
00506       memcpy(buffer+k,&i,4);
00507       k+=4;
00508    }
00509 
00510    /*length of event (hdr+payload-4), copied at the beginning*/
00511    if(encode_msg(msg,buffer+k,ENCODED_MSG_SIZE-k)<0){
00512       LM_ERR("Unable to encode msg\n");
00513       goto error;
00514    }
00515    i = GET_PAY_SIZE(buffer+k);
00516    k+=i;
00517    *evt_len=k;
00518    k=htonl(k);
00519    memcpy(buffer,&k,4);
00520    return buffer;
00521 error:
00522    if(buffer)
00523       shm_free(buffer);
00524    return 0;
00525 }
00526 
00527 
00528 /**
00529  * creates an as_event in shared memory and returns its address or NULL if error.
00530  * event_length(4) UNSIGNED INT includes the length 4 bytes itself
00531  * type(1), 
00532  * processor_id(4), 
00533  * flags(4),
00534  * transport(1).
00535  * src_ip_len(1), 
00536  * src_ip(4 or 16), 
00537  * dst_ip_len(1), 
00538  * dst_ip(4 or 16), 
00539  * src_port(2), 
00540  * dst_port(2), 
00541  *
00542  */
00543 char * create_as_event_sl(struct sip_msg *msg,char processor_id,int *evt_len,int flags)
00544 {
00545    unsigned int i;
00546    unsigned short int port;
00547    unsigned int k,len;
00548    char *buffer=NULL;
00549 
00550    if(!(buffer=shm_malloc(ENCODED_MSG_SIZE))){
00551       LM_ERR("create_as_event_t Out Of Memory !!\n");
00552       return 0;
00553    }
00554    *evt_len=0;
00555 
00556    /*leave 4 bytes for event length*/
00557    k=4;
00558    /*type*/
00559    buffer[k++]=(unsigned char)SL_REQ_IN;
00560    /*processor_id*/
00561    buffer[k++]=(unsigned char)processor_id;
00562    /*flags*/
00563    flags=htonl(flags);
00564    memcpy(buffer+k,&flags,4);
00565    k+=4;
00566    /*protocol should be UDP,TCP,TLS or whatever*/
00567    buffer[k++]=(unsigned char)msg->rcv.proto;
00568    /*src ip len + src ip*/
00569    len=msg->rcv.src_ip.len;
00570    buffer[k++]=(unsigned char)len;
00571    memcpy(buffer+k,&(msg->rcv.src_ip.u),len);
00572    k+=len;
00573    /*dst ip len + dst ip*/
00574    len=msg->rcv.dst_ip.len;
00575    buffer[k++]=(unsigned char)len;
00576    memcpy(buffer+k,&(msg->rcv.dst_ip.u),len);
00577    k+=len;
00578    /*src port */
00579    port=htons(msg->rcv.src_port);
00580    memcpy(buffer+k,&port,2);
00581    k+=2;
00582    /*dst port */
00583    port=htons(msg->rcv.dst_port);
00584    memcpy(buffer+k,&port,2);
00585    k+=2;
00586    /*length of event (hdr+payload-4), copied at the beginning*/
00587    if(encode_msg(msg,buffer+k,ENCODED_MSG_SIZE-k)<0){
00588       LM_ERR("Unable to encode msg\n");
00589       goto error;
00590    }
00591    i = GET_PAY_SIZE(buffer+k);
00592    k+=i;
00593    *evt_len=k;
00594    k=htonl(k);
00595    memcpy(buffer,&k,4);
00596    return buffer;
00597 error:
00598    if(buffer)
00599       shm_free(buffer);
00600    return 0;
00601 }
00602 
00603 
00604 static inline int is_e2e_ack(struct cell *t,struct sip_msg *msg)
00605 {
00606    if(msg->REQ_METHOD != METHOD_ACK)
00607       return 0;
00608    if (t->uas.status<300)
00609       return 1;
00610    return 0;
00611 }
00612 
00613 /** Initializes seas module. It first parses the listen_sockets parameter
00614  * which has the form "ip_address[:port]", creates the pipe to
00615  * communicate with the dispatcher.
00616  */
00617 static int seas_init(void)
00618 {
00619    char *p,*port;
00620    struct hostent *he;
00621    struct socket_info *si;
00622    int c_pipe[2],mierr,i;
00623    /** Populate seas_functions*/
00624    if (load_tm_api(&seas_f.tmb)!=0) {
00625       LM_ERR( "can't load TM API\n");
00626       return -1;
00627    }
00628    if(!(seas_f.t_check_orig_trans = find_export("t_check_trans", 0, 0))){
00629       LM_ERR( "Seas requires transaction module (t_check_trans not found)\n");
00630       return -1;
00631    }
00632    /** Populate seas_functions*/
00633    c_pipe[0]=c_pipe[1]=-1;
00634    p=seas_listen_socket;
00635    port=(char *)0;
00636    seas_listen_port=5080;
00637    /*if the seas_listen_socket configuration string is empty, use default values*/
00638    if(p==NULL || *p==0){
00639       si=get_first_socket();
00640       seas_listen_ip=&si->address;
00641    } else {/*if config string is not empty, then try to find host first, and maybe port..*/
00642       while(*p){
00643     if(*p == ':'){
00644        *p=0;
00645        port=p+1;
00646        break;
00647     }
00648     p++;
00649       }
00650       if(!(he=resolvehost(seas_listen_socket,0)))
00651     goto error;
00652       if(!(seas_listen_ip=pkg_malloc(sizeof(struct ip_addr))))
00653     goto error;
00654       hostent2ip_addr(seas_listen_ip, he, 0);
00655       if(port!=(char *)0 && (seas_listen_port=str2s(port,strlen(port),&mierr))==0){
00656     LM_ERR("invalid port %s \n",port);
00657     goto error;
00658       }
00659    }
00660    memset(unc_as_t,0,2*MAX_UNC_AS_NR*sizeof(struct unc_as));//useless because unc_as_t is in bss?
00661    if (pipe(c_pipe)==-1) {
00662       LM_ERR("cannot create pipe!\n");
00663       goto error;
00664    }
00665    read_pipe=c_pipe[0];
00666    write_pipe=c_pipe[1];
00667    seas_init_tags();
00668    if(0>start_stats_server(seas_stats_socket))
00669       goto error;
00670    if(0>prepare_ha())
00671       goto error;
00672    if(0>parse_cluster_cfg())
00673       goto error;
00674    return 0;
00675 error:
00676    for(i=0;i<2;i++)
00677       if(c_pipe[i]!=-1)
00678     close(c_pipe[i]);
00679    if(seas_listen_ip!=0)
00680       pkg_free(seas_listen_ip);
00681    if(use_stats)
00682       stop_stats_server();
00683    return -1;
00684 }
00685 
00686 
00687 /**Initializes SEAS to-tags
00688 */
00689 static void seas_init_tags(void)
00690 {
00691    init_tags(seas_tags, &seas_tag_suffix,"VozTele-Seas/tags",'-');
00692    LM_DBG("seas_init_tags, seas_tags=%s\n",seas_tags);
00693 }
00694 
00695 /**
00696  * This function initializes each one of the processes spawn by the server.
00697  * the rank is 1 only when the main process is being initialized, so in that
00698  * case the function spawns the SEAS process to handle as_events triggered
00699  * from the other SER processes (executing the script).
00700  * the new process created, then goes into dispatcher_main_loop(), where
00701  * it reads() the pipe waiting for events produced by other SER processes.
00702  */
00703 static int seas_child_init(int rank)
00704 {
00705    int pid;
00706 
00707    /* only the child 1 will execute this */
00708    if (rank != 1){
00709       /* only dispatcher needs to read from the pipe, so close reading fd*/
00710       close(read_pipe);
00711       return 0;
00712    }
00713    if ((pid=fork())<0) {
00714       LM_ERR("forking failed\n");
00715       return -1;
00716    }
00717    if (!pid) {
00718       /*dispatcher child. we leave writing end open so that new childs spawned
00719        * by event dispatcher can also write to pipe.. */
00720 
00721       /* close(write_pipe); */
00722       return dispatcher_main_loop();
00723    }
00724    return 0;
00725 }
00726 
00727 /* this should close the sockets open to any of the application servers, and
00728  * send them an EOF event or something that signals that SER is beeing shutdown,
00729  * so they could do their cleanup, etc.
00730  */
00731 static int seas_exit(void)
00732 {
00733    if( seas_listen_ip!=NULL && seas_listen_ip!=&(get_first_socket()->address))
00734       pkg_free(seas_listen_ip);
00735    return 0;
00736 }
00737 
00738 /**
00739  * search within a given AS, if any of the registered processors is bound
00740  * to the receive_info structure passed. If there is one, it returns its 
00741  * identifier (number between 0 and 128), otherwise it returns -1;
00742  */
00743 char get_processor_id(struct receive_info *rcv,as_p as)
00744 {
00745    int i;
00746    for(i=0;i<MAX_BINDS;i++){
00747       if(as->bound_processor[i]!=0 &&
00748        (rcv->dst_ip.len == as->binds[i]->address.len) &&
00749        (rcv->dst_ip.af==as->binds[i]->address.af) &&
00750        (!memcmp(rcv->dst_ip.u.addr,as->binds[i]->address.u.addr,rcv->dst_ip.len))/* &&
00751                                      (rcv->dst_port==as->binds[i].dst_port) &&
00752                                      (rcv->proto==as->binds[i].proto)*/)
00753     return as->bound_processor[i];
00754    }
00755    return -1;
00756 }

Generated on Thu May 24 12:00:30 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6