00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <string.h>
00023 #include <errno.h>
00024 #include <unistd.h>
00025 #include <sys/wait.h>
00026 #include <signal.h>
00027
00028 #include "../../sr_module.h"
00029 #include "../../ip_addr.h"
00030 #include "../../tags.h"
00031 #include "../../socket_info.h"
00032 #include "../../resolve.h"
00033 #include "../../mem/mem.h"
00034 #include "../../mem/shm_mem.h"
00035 #include "../../dprint.h"
00036 #include "../../error.h"
00037 #include "../tm/tm_load.h"
00038 #include "../tm/h_table.h"
00039 #include "../tm/t_lookup.h"
00040
00041 #include "encode_msg.h"
00042
00043 #include "seas.h"
00044 #include "seas_action.h"
00045 #include "event_dispatcher.h"
00046 #include "statistics.h"
00047 #include "ha.h"
00048 #include "cluster.h"
00049
00050 MODULE_VERSION
00051
00052
00053
00054 static int w_as_relay_t(struct sip_msg *msg, char *as_name, char *foo);
00055 static int w_as_relay_sl(struct sip_msg *msg, char *as_name, char *foo);
00056
00057
00058 static int seas_init(void);
00059 static int seas_child_init(int rank);
00060 static int seas_exit();
00061 static int fixup_as_relay(void** param, int param_no);
00062
00063
00064 static void seas_init_tags();
00065 static inline int is_e2e_ack(struct cell *t,struct sip_msg *msg);
00066
00067 char seas_tags[TOTAG_VALUE_LEN+1];
00068 char *seas_tag_suffix;
00069 char whoami[MAX_WHOAMI_LEN];
00070 int is_dispatcher=0;
00071 extern int sig_flag;
00072
00073 static char *seas_listen_socket=0;
00074 static char *seas_stats_socket=0;
00075
00076 struct ip_addr *seas_listen_ip=0;
00077 unsigned short seas_listen_port=0;
00078
00079 struct as_entry *as_table=0;
00080
00081 struct as_entry *as_list=0;
00082
00083 int write_pipe=0;
00084 int read_pipe=0;
00085
00086 struct seas_functions seas_f;
00087
00088 static cmd_export_t cmds[]=
00089 {
00090 {"as_relay_t", (cmd_function)w_as_relay_t, 1, fixup_as_relay,
00091 0, REQUEST_ROUTE},
00092 {"as_relay_sl", (cmd_function)w_as_relay_sl, 1, fixup_as_relay,
00093 0, REQUEST_ROUTE},
00094 {0,0,0,0,0,0}
00095 };
00096
00097 static param_export_t params[]=
00098 {
00099 {"listen_sockets",STR_PARAM, &seas_listen_socket},
00100 {"stats_socket", STR_PARAM, &seas_stats_socket},
00101 {"jain_ping", STR_PARAM, &jain_ping_config},
00102 {"servlet_ping", STR_PARAM, &servlet_ping_config},
00103 {"clusters", STR_PARAM, &cluster_cfg},
00104 {0,0,0}
00105 };
00106
00107 struct module_exports exports=
00108 {
00109 "seas",
00110 DEFAULT_DLFLAGS,
00111 cmds,
00112 params,
00113 0,
00114 0,
00115 0,
00116 0,
00117 seas_init,
00118 0,
00119 (destroy_function) seas_exit,
00120 (child_init_function) seas_child_init
00121 };
00122
00123 static int fixup_as_relay(void** param, int param_no)
00124 {
00125 int len;
00126 char *parameter;
00127 struct as_entry **entry,*tmp;
00128
00129 parameter=(char *)(*param);
00130
00131 if (param_no!=1)
00132 return 0;
00133 len=strlen(parameter);
00134
00135 for (entry=&as_list;*entry;entry=&((*entry)->next)) {
00136 if (len== (*entry)->name.len &&
00137 !memcmp((*entry)->name.s,parameter,len)) {
00138 pkg_free(*param);
00139 *param=*entry;
00140 return 1;
00141 }
00142 }
00143 if (!(*entry)) {
00144 if (!(*entry=(struct as_entry *)shm_malloc(sizeof(struct as_entry)))) {
00145 LM_ERR("no more shm_mem\n");
00146 goto error;
00147 }
00148 memset(*entry,0,sizeof(struct as_entry));
00149 if(!((*entry)->name.s=shm_malloc(len))){
00150 LM_ERR("no more share mem\n");
00151 goto error;
00152 }
00153 (*entry)->name.len=len;
00154 memcpy((*entry)->name.s,parameter,len);
00155 (*entry)->u.as.name=(*entry)->name;
00156 (*entry)->u.as.event_fd=(*entry)->u.as.action_fd=-1;
00157 (*entry)->type=AS_TYPE;
00158 pkg_free(*param);
00159 *param=*entry;
00160 }
00161 for (tmp=as_list;tmp;tmp=tmp->next)
00162 LM_DBG("%.*s\n",tmp->name.len,tmp->name.s);
00163 return 1;
00164 error:
00165 return -1;
00166 }
00167
00168
00169
00170
00171 void seas_sighandler(int signo)
00172 {
00173 struct as_entry *as;
00174 if(is_dispatcher)
00175 sig_flag=signo;
00176 switch(signo){
00177 case SIGPIPE:
00178 if(is_dispatcher)
00179 return;
00180 LM_INFO("%s exiting\n",whoami);
00181 if(my_as->u.as.ac_buffer.s){
00182 pkg_free(my_as->u.as.ac_buffer.s);
00183 my_as->u.as.ac_buffer.s=0;
00184 }
00185 if(my_as->u.as.action_fd!=-1){
00186 close(my_as->u.as.action_fd);
00187 my_as->u.as.action_fd=-1;
00188 }
00189 exit(0);
00190 break;
00191 case SIGCHLD:
00192 LM_INFO("Child stopped or terminated\n");
00193 break;
00194 case SIGUSR1:
00195 case SIGUSR2:
00196 LM_DBG("Memory status (pkg):\n");
00197 #ifdef PKG_MALLOC
00198 pkg_status();
00199 #endif
00200 break;
00201 case SIGINT:
00202 case SIGTERM:
00203 LM_INFO("INFO: signal %d received\n",signo);
00204 #ifdef PKG_MALLOC
00205 pkg_status();
00206 #endif
00207 if(is_dispatcher){
00208 for (as=as_list;as;as=as->next) {
00209 if(as->type==AS_TYPE && as->connected)
00210 kill(as->u.as.action_pid,signo);
00211 }
00212 while(wait(0) > 0);
00213 exit(0);
00214 }else{
00215 LM_INFO("%s exiting\n",whoami);
00216 if(my_as && my_as->u.as.ac_buffer.s)
00217 pkg_free(my_as->u.as.ac_buffer.s);
00218 if(my_as && my_as->u.as.action_fd!=-1)
00219 close(my_as->u.as.action_fd);
00220 exit(0);
00221 }
00222 break;
00223 }
00224 }
00225
00226
00227
00228
00229
00230 static int w_as_relay_t(struct sip_msg *msg, char *entry, char *foo)
00231 {
00232 as_msg_p my_as_ev;
00233 int new_tran,ret=0,len;
00234 char *buffer,processor_id;
00235 struct cell *mycel;
00236 struct as_entry *as;
00237 static str msg100={"Your call is important to us",sizeof("Your call is important to us")-1};
00238 static str msg500={"Server Internal Error!",sizeof("Server Internal Error!")-1};
00239
00240 buffer=(char*)0;
00241 my_as_ev=(as_msg_p)0;
00242
00243
00244
00245
00246
00247
00248 new_tran = seas_f.tmb.t_newtran(msg);
00249 if(new_tran<0) {
00250 ret = (ser_error==E_BAD_VIA && reply_to_via) ? 0 : new_tran;
00251 goto done;
00252 }
00253
00254 if (new_tran==0 && !(msg->REQ_METHOD==METHOD_ACK)){
00255 ret = 0;
00256 goto done;
00257 }
00258
00259 if (msg->REQ_METHOD==METHOD_INVITE )
00260 {
00261 LM_DBG("new INVITE\n");
00262 if(!seas_f.tmb.t_reply(msg,100,&msg100)){
00263 LM_DBG("t_reply (100)\n");
00264 goto error;
00265 }
00266 }
00267 as=(struct as_entry *)entry;
00268 if(!as->connected){
00269 LM_ERR("app server %.*s not connected\n",as->name.len,as->name.s);
00270 goto error;
00271 }
00272 if(as->type==AS_TYPE){
00273 if((processor_id=get_processor_id(&msg->rcv,&(as->u.as)))<0){
00274 LM_ERR("no processor found for packet with dst port:%d\n",msg->rcv.dst_port);
00275 goto error;
00276 }
00277 }else if(as->type==CLUSTER_TYPE){
00278 LM_ERR("clustering not fully implemented\n");
00279 return 0;
00280 }else{
00281 LM_ERR("unknown type of as (neither cluster nor as)\n");
00282 return -1;
00283 }
00284 LM_DBG("as found ! (%.*s) processor id = %d\n",as->name.len,as->name.s,processor_id);
00285 if(new_tran==1 && msg->REQ_METHOD==METHOD_ACK){
00286
00287 LM_DBG("forwarding statelessly !!!\n");
00288 if(!(buffer=create_as_event_sl(msg,processor_id,&len,0))){
00289 LM_ERR("create_as_event_sl() unable to create event code\n");
00290 goto error;
00291 }
00292 }else if(!(buffer=create_as_event_t(seas_f.tmb.t_gett(),msg,processor_id,&len,0))){
00293 LM_ERR("unable to create event code\n");
00294 goto error;
00295 }
00296 if(!(my_as_ev=shm_malloc(sizeof(as_msg_t)))){
00297 LM_ERR("Out of shared mem!\n");
00298 goto error;
00299 }
00300 my_as_ev->msg = buffer;
00301 my_as_ev->as = as;
00302 my_as_ev->type = T_REQ_IN;
00303 my_as_ev->len = len;
00304 my_as_ev->transaction=seas_f.tmb.t_gett();
00305 if(use_stats && new_tran>0)
00306 as_relay_stat(seas_f.tmb.t_gett());
00307 again:
00308 ret=write(write_pipe,&my_as_ev,sizeof(as_msg_p));
00309 if(ret==-1){
00310 if(errno==EINTR)
00311 goto again;
00312 else if(errno==EPIPE){
00313 LM_ERR("SEAS Event Dispatcher has closed the pipe. Invalidating it !\n");
00314 goto error;
00315
00316 }
00317 }
00318 seas_f.tmb.t_setkr(REQ_FWDED);
00319 ret=0;
00320 done:
00321 return ret;
00322 error:
00323 mycel=seas_f.tmb.t_gett();
00324 if(mycel && mycel!=T_UNDEFINED){
00325 if(!seas_f.tmb.t_reply(msg,500,&msg500)){
00326 LM_ERR("t_reply (500)\n");
00327 }
00328 }
00329 if(my_as_ev)
00330 shm_free(my_as_ev);
00331 if(buffer)
00332 shm_free(buffer);
00333 return ret;
00334 }
00335
00336
00337
00338
00339
00340
00341 static int w_as_relay_sl(struct sip_msg *msg, char *as_name, char *foo)
00342 {
00343 as_msg_p my_as_ev=0;
00344 int ret=0,len;
00345 char *buffer=0,processor_id;
00346 struct as_entry *as;
00347
00348 as=(struct as_entry *)as_name;
00349
00350 if(as->type==AS_TYPE){
00351 if((processor_id=get_processor_id(&msg->rcv,&(as->u.as)))<0){
00352 LM_ERR("no processor found for packet with dst port:%d\n",msg->rcv.dst_port);
00353 goto error;
00354 }
00355 }else if (as->type==CLUSTER_TYPE) {
00356 LM_ERR("clustering not fully implemented\n");
00357 goto error;
00358 }else{
00359 LM_ERR("unknown type of as\n");
00360 goto error;
00361 }
00362
00363 LM_DBG("as found ! (%.*s) processor id = %d\n",as->name.len,as->name.s,processor_id);
00364 if(!(buffer=create_as_event_sl(msg,processor_id,&len,0))){
00365 LM_ERR("unable to create event code\n");
00366 goto error;
00367 }
00368 if(!(my_as_ev=shm_malloc(sizeof(as_msg_t))))
00369 goto error;
00370 my_as_ev->msg = buffer;
00371 my_as_ev->as = as;
00372 my_as_ev->type = SL_REQ_IN;
00373 my_as_ev->len = len;
00374 my_as_ev->transaction=seas_f.tmb.t_gett();
00375 if(use_stats)
00376 as_relay_stat(seas_f.tmb.t_gett());
00377 again:
00378 ret=write(write_pipe,&my_as_ev,sizeof(as_msg_p));
00379 if(ret==-1){
00380 if(errno==EINTR)
00381 goto again;
00382 else if(errno==EPIPE){
00383 LM_ERR("SEAS Event Dispatcher has closed the pipe. Invalidating it !\n");
00384 return -2;
00385
00386 }
00387 }
00388
00389
00390
00391
00392
00393 seas_f.tmb.t_setkr(REQ_FWDED);
00394
00395 ret=0;
00396 return ret;
00397 error:
00398 if(my_as_ev)
00399 shm_free(my_as_ev);
00400 if(buffer)
00401 shm_free(buffer);
00402 return ret;
00403 }
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422 char * create_as_event_t(struct cell *t,struct sip_msg *msg,char processor_id,int *evt_len,int flags)
00423 {
00424 unsigned int i,hash_index,label;
00425 unsigned short int port;
00426 unsigned int k,len;
00427 char *buffer=NULL;
00428 struct cell *originalT;
00429
00430 originalT=0;
00431
00432 if(!(buffer=shm_malloc(ENCODED_MSG_SIZE))){
00433 LM_ERR("Out Of Memory !!\n");
00434 return 0;
00435 }
00436 *evt_len=0;
00437 if(t){
00438 hash_index=t->hash_index;
00439 label=t->label;
00440 }else{
00441
00442 LM_ERR("no transaction provided...\n");
00443 goto error;
00444 }
00445
00446 k=4;
00447
00448 buffer[k++]=(unsigned char)T_REQ_IN;
00449
00450 buffer[k++]=(unsigned char)processor_id;
00451
00452 if(is_e2e_ack(t,msg)){
00453 flags|=E2E_ACK;
00454 }else if(msg->REQ_METHOD==METHOD_CANCEL){
00455 LM_DBG("new CANCEL\n");
00456 originalT=seas_f.tmb.t_lookup_original_t(msg);
00457 if(!originalT || originalT==T_UNDEFINED){
00458
00459 LM_WARN("CANCEL does not match any existing transaction!!\n");
00460 goto error;
00461 }else{
00462 flags|=CANCEL_FOUND;
00463
00464 }
00465 LM_DBG("Cancelling transaction !!\n");
00466 }
00467 flags=htonl(flags);
00468 memcpy(buffer+k,&flags,4);
00469 k+=4;
00470
00471 buffer[k++]=(unsigned char)msg->rcv.proto;
00472
00473 len=msg->rcv.src_ip.len;
00474 buffer[k++]=(unsigned char)len;
00475 memcpy(buffer+k,&(msg->rcv.src_ip.u),len);
00476 k+=len;
00477
00478 len=msg->rcv.dst_ip.len;
00479 buffer[k++]=(unsigned char)len;
00480 memcpy(buffer+k,&(msg->rcv.dst_ip.u),len);
00481 k+=len;
00482
00483 port=htons(msg->rcv.src_port);
00484 memcpy(buffer+k,&port,2);
00485 k+=2;
00486
00487 port=htons(msg->rcv.dst_port);
00488 memcpy(buffer+k,&port,2);
00489 k+=2;
00490
00491 i=htonl(hash_index);
00492 memcpy(buffer+k,&i,4);
00493 k+=4;
00494
00495 i=htonl(label);
00496 memcpy(buffer+k,&i,4);
00497 k+=4;
00498 if(msg->REQ_METHOD==METHOD_CANCEL && originalT){
00499 LM_DBG("Cancelled transaction: Hash_Index=%d, Label=%d\n",originalT->hash_index,originalT->label);
00500
00501 i=htonl(originalT->hash_index);
00502 memcpy(buffer+k,&i,4);
00503 k+=4;
00504
00505 i=htonl(originalT->label);
00506 memcpy(buffer+k,&i,4);
00507 k+=4;
00508 }
00509
00510
00511 if(encode_msg(msg,buffer+k,ENCODED_MSG_SIZE-k)<0){
00512 LM_ERR("Unable to encode msg\n");
00513 goto error;
00514 }
00515 i = GET_PAY_SIZE(buffer+k);
00516 k+=i;
00517 *evt_len=k;
00518 k=htonl(k);
00519 memcpy(buffer,&k,4);
00520 return buffer;
00521 error:
00522 if(buffer)
00523 shm_free(buffer);
00524 return 0;
00525 }
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543 char * create_as_event_sl(struct sip_msg *msg,char processor_id,int *evt_len,int flags)
00544 {
00545 unsigned int i;
00546 unsigned short int port;
00547 unsigned int k,len;
00548 char *buffer=NULL;
00549
00550 if(!(buffer=shm_malloc(ENCODED_MSG_SIZE))){
00551 LM_ERR("create_as_event_t Out Of Memory !!\n");
00552 return 0;
00553 }
00554 *evt_len=0;
00555
00556
00557 k=4;
00558
00559 buffer[k++]=(unsigned char)SL_REQ_IN;
00560
00561 buffer[k++]=(unsigned char)processor_id;
00562
00563 flags=htonl(flags);
00564 memcpy(buffer+k,&flags,4);
00565 k+=4;
00566
00567 buffer[k++]=(unsigned char)msg->rcv.proto;
00568
00569 len=msg->rcv.src_ip.len;
00570 buffer[k++]=(unsigned char)len;
00571 memcpy(buffer+k,&(msg->rcv.src_ip.u),len);
00572 k+=len;
00573
00574 len=msg->rcv.dst_ip.len;
00575 buffer[k++]=(unsigned char)len;
00576 memcpy(buffer+k,&(msg->rcv.dst_ip.u),len);
00577 k+=len;
00578
00579 port=htons(msg->rcv.src_port);
00580 memcpy(buffer+k,&port,2);
00581 k+=2;
00582
00583 port=htons(msg->rcv.dst_port);
00584 memcpy(buffer+k,&port,2);
00585 k+=2;
00586
00587 if(encode_msg(msg,buffer+k,ENCODED_MSG_SIZE-k)<0){
00588 LM_ERR("Unable to encode msg\n");
00589 goto error;
00590 }
00591 i = GET_PAY_SIZE(buffer+k);
00592 k+=i;
00593 *evt_len=k;
00594 k=htonl(k);
00595 memcpy(buffer,&k,4);
00596 return buffer;
00597 error:
00598 if(buffer)
00599 shm_free(buffer);
00600 return 0;
00601 }
00602
00603
00604 static inline int is_e2e_ack(struct cell *t,struct sip_msg *msg)
00605 {
00606 if(msg->REQ_METHOD != METHOD_ACK)
00607 return 0;
00608 if (t->uas.status<300)
00609 return 1;
00610 return 0;
00611 }
00612
00613
00614
00615
00616
00617 static int seas_init(void)
00618 {
00619 char *p,*port;
00620 struct hostent *he;
00621 struct socket_info *si;
00622 int c_pipe[2],mierr,i;
00623
00624 if (load_tm_api(&seas_f.tmb)!=0) {
00625 LM_ERR( "can't load TM API\n");
00626 return -1;
00627 }
00628 if(!(seas_f.t_check_orig_trans = find_export("t_check_trans", 0, 0))){
00629 LM_ERR( "Seas requires transaction module (t_check_trans not found)\n");
00630 return -1;
00631 }
00632
00633 c_pipe[0]=c_pipe[1]=-1;
00634 p=seas_listen_socket;
00635 port=(char *)0;
00636 seas_listen_port=5080;
00637
00638 if(p==NULL || *p==0){
00639 si=get_first_socket();
00640 seas_listen_ip=&si->address;
00641 } else {
00642 while(*p){
00643 if(*p == ':'){
00644 *p=0;
00645 port=p+1;
00646 break;
00647 }
00648 p++;
00649 }
00650 if(!(he=resolvehost(seas_listen_socket,0)))
00651 goto error;
00652 if(!(seas_listen_ip=pkg_malloc(sizeof(struct ip_addr))))
00653 goto error;
00654 hostent2ip_addr(seas_listen_ip, he, 0);
00655 if(port!=(char *)0 && (seas_listen_port=str2s(port,strlen(port),&mierr))==0){
00656 LM_ERR("invalid port %s \n",port);
00657 goto error;
00658 }
00659 }
00660 memset(unc_as_t,0,2*MAX_UNC_AS_NR*sizeof(struct unc_as));
00661 if (pipe(c_pipe)==-1) {
00662 LM_ERR("cannot create pipe!\n");
00663 goto error;
00664 }
00665 read_pipe=c_pipe[0];
00666 write_pipe=c_pipe[1];
00667 seas_init_tags();
00668 if(0>start_stats_server(seas_stats_socket))
00669 goto error;
00670 if(0>prepare_ha())
00671 goto error;
00672 if(0>parse_cluster_cfg())
00673 goto error;
00674 return 0;
00675 error:
00676 for(i=0;i<2;i++)
00677 if(c_pipe[i]!=-1)
00678 close(c_pipe[i]);
00679 if(seas_listen_ip!=0)
00680 pkg_free(seas_listen_ip);
00681 if(use_stats)
00682 stop_stats_server();
00683 return -1;
00684 }
00685
00686
00687
00688
00689 static void seas_init_tags(void)
00690 {
00691 init_tags(seas_tags, &seas_tag_suffix,"VozTele-Seas/tags",'-');
00692 LM_DBG("seas_init_tags, seas_tags=%s\n",seas_tags);
00693 }
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703 static int seas_child_init(int rank)
00704 {
00705 int pid;
00706
00707
00708 if (rank != 1){
00709
00710 close(read_pipe);
00711 return 0;
00712 }
00713 if ((pid=fork())<0) {
00714 LM_ERR("forking failed\n");
00715 return -1;
00716 }
00717 if (!pid) {
00718
00719
00720
00721
00722 return dispatcher_main_loop();
00723 }
00724 return 0;
00725 }
00726
00727
00728
00729
00730
00731 static int seas_exit(void)
00732 {
00733 if( seas_listen_ip!=NULL && seas_listen_ip!=&(get_first_socket()->address))
00734 pkg_free(seas_listen_ip);
00735 return 0;
00736 }
00737
00738
00739
00740
00741
00742
00743 char get_processor_id(struct receive_info *rcv,as_p as)
00744 {
00745 int i;
00746 for(i=0;i<MAX_BINDS;i++){
00747 if(as->bound_processor[i]!=0 &&
00748 (rcv->dst_ip.len == as->binds[i]->address.len) &&
00749 (rcv->dst_ip.af==as->binds[i]->address.af) &&
00750 (!memcmp(rcv->dst_ip.u.addr,as->binds[i]->address.u.addr,rcv->dst_ip.len))
00751
00752 )
00753 return as->bound_processor[i];
00754 }
00755 return -1;
00756 }