presence/notify.c

Go to the documentation of this file.
00001 /*
00002  * $Id: notify.c 5621 2009-02-18 19:10:56Z miconda $
00003  *
00004  * presence module- presence server implementation
00005  *
00006  * Copyright (C) 2006 Voice Sistem S.R.L.
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software 
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * History:
00025  * --------
00026  *  2006-08-15  initial version (anca)
00027  */
00028 
00029 /*! \file
00030  * \brief Kamailio presence module :: Notification with SIP NOTIFY
00031  * \ingroup presence 
00032  */
00033 
00034 
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <string.h>
00038 #include <libxml/parser.h>
00039 
00040 #include "../../trim.h"
00041 #include "../../ut.h"
00042 #include "../../globals.h"
00043 #include "../../str.h"
00044 #include "../../db/db.h"
00045 #include "../../db/db_val.h"
00046 #include "../../socket_info.h"
00047 #include "../tm/tm_load.h"
00048 #include "../pua/hash.h"
00049 #include "presentity.h"
00050 #include "presence.h"
00051 #include "notify.h"
00052 #include "utils_func.h"
00053 
00054 #define ALLOC_SIZE 3000
00055 #define MAX_FORWARD 70
00056 
00057 c_back_param* shm_dup_cbparam(subs_t*);
00058 void free_cbparam(c_back_param* cb_param);
00059 
00060 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps);
00061 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
00062 int add_watcher_list(subs_t *s, watcher_t *watchers);
00063 str* create_winfo_xml(watcher_t* watchers, char* version,
00064       str resource, str event, int STATE_FLAG);
00065 void free_watcher_list(watcher_t* watchers);
00066 
00067 str str_to_user_col = str_init("to_user");
00068 str str_username_col = str_init("username");
00069 str str_domain_col = str_init("domain");
00070 str str_body_col = str_init("body");
00071 str str_to_domain_col = str_init("to_domain");
00072 str str_watcher_username_col = str_init("watcher_username");
00073 str str_watcher_domain_col = str_init("watcher_domain");
00074 str str_event_id_col = str_init("event_id");
00075 str str_event_col = str_init("event");
00076 str str_etag_col = str_init("etag");
00077 str str_from_tag_col = str_init("from_tag");
00078 str str_to_tag_col = str_init("to_tag");
00079 str str_callid_col = str_init("callid");
00080 str str_local_cseq_col = str_init("local_cseq");
00081 str str_remote_cseq_col = str_init("remote_cseq");
00082 str str_record_route_col = str_init("record_route");
00083 str str_contact_col = str_init("contact");
00084 str str_expires_col = str_init("expires");
00085 str str_status_col = str_init("status");
00086 str str_reason_col = str_init("reason");
00087 str str_socket_info_col = str_init("socket_info");
00088 str str_local_contact_col = str_init("local_contact");
00089 str str_version_col = str_init("version");
00090 str str_presentity_uri_col = str_init("presentity_uri");
00091 str str_inserted_time_col = str_init("inserted_time");
00092 str str_received_time_col = str_init("received_time");
00093 str str_id_col = str_init("id");
00094 str str_sender_col = str_init("sender");
00095 
00096 char* get_status_str(int status_flag)
00097 {
00098    switch(status_flag)
00099    {
00100       case ACTIVE_STATUS: return "active";
00101       case PENDING_STATUS: return "pending";
00102       case TERMINATED_STATUS: return "terminated";
00103       case WAITING_STATUS: return "waiting";
00104    }
00105    return NULL;
00106 }
00107 
00108 void printf_subs(subs_t* subs)
00109 {  
00110    LM_DBG("\n\t[pres_uri]= %.*s\n\t[to_user]= %.*s\t[to_domain]= %.*s"
00111       "\n\t[w_user]= %.*s\t[w_domain]= %.*s\n\t[event]= %.*s\n\t[status]= %s"
00112       "\n\t[expires]= %u\n\t[callid]= %.*s\t[local_cseq]=%d"
00113       "\n\t[to_tag]= %.*s\t[from_tag]= %.*s""\n\t[contact]= %.*s"
00114       "\t[record_route]= %.*s\n",subs->pres_uri.len,subs->pres_uri.s,
00115       subs->to_user.len,subs->to_user.s,subs->to_domain.len,
00116       subs->to_domain.s,subs->from_user.len,subs->from_user.s,
00117       subs->from_domain.len,subs->from_domain.s,subs->event->name.len,
00118       subs->event->name.s,get_status_str(subs->status),subs->expires,
00119       subs->callid.len,subs->callid.s,subs->local_cseq,subs->to_tag.len,
00120       subs->to_tag.s,subs->from_tag.len, subs->from_tag.s,subs->contact.len,
00121       subs->contact.s,subs->record_route.len,subs->record_route.s);
00122 }
00123 
00124 int build_str_hdr(subs_t* subs, int is_body, str* hdr)
00125 {
00126    pres_ev_t* event= subs->event;
00127    str expires = {0, 0};
00128    str status = {0, 0};
00129    str tmp = {0, 0};
00130 
00131    if(hdr == NULL)
00132    {
00133       LM_ERR("bad parameter\n");
00134       return -1;
00135    }
00136    expires.s = int2str(subs->expires, &expires.len);
00137    status.s= get_status_str(subs->status);
00138    if(status.s == NULL)
00139    {
00140       LM_ERR("bad status %d\n", subs->status);
00141       return -1;
00142    }
00143    status.len = strlen(status.s);
00144 
00145    hdr->len = 18 /*Max-Forwards:  + val*/ + CRLF_LEN + 
00146       7 /*Event: */ + subs->event->name.len +4 /*;id=*/+ subs->event_id.len+
00147       CRLF_LEN + 10 /*Contact: <*/ + subs->local_contact.len + 1/*>*/ +
00148       15/*";transport=xxxx"*/ + CRLF_LEN + 20 /*Subscription-State: */ +
00149       status.len + 10 /*reason/expires params*/
00150       + (subs->reason.len>expires.len?subs->reason.len:expires.len)
00151       + CRLF_LEN + (is_body?
00152       (14 /*Content-Type: */+subs->event->content_type.len + CRLF_LEN):0) + 1;
00153 
00154    hdr->s = (char*)pkg_malloc(hdr->len);
00155    if(hdr->s == NULL)
00156    {
00157       LM_ERR("no more pkg\n");
00158       return -1;
00159    }
00160 
00161    strncpy(hdr->s, "Max-Forwards: ", 14);
00162    tmp.s = int2str((unsigned long)MAX_FORWARD, &tmp.len);
00163    strncpy(hdr->s+14, tmp.s, tmp.len);
00164    tmp.s = hdr->s + tmp.len + 14;
00165    strncpy(tmp.s, CRLF, CRLF_LEN);
00166    tmp.s += CRLF_LEN;
00167 
00168    strncpy(tmp.s  ,"Event: ", 7);
00169    tmp.s += 7;
00170    strncpy(tmp.s, event->name.s, event->name.len);
00171    tmp.s += event->name.len;
00172    if(subs->event_id.len && subs->event_id.s) 
00173    {
00174       strncpy(tmp.s, ";id=", 4);
00175       tmp.s += 4;
00176       strncpy(tmp.s, subs->event_id.s, subs->event_id.len);
00177       tmp.s += subs->event_id.len;
00178    }
00179    strncpy(tmp.s, CRLF, CRLF_LEN);
00180    tmp.s += CRLF_LEN;
00181 
00182    strncpy(tmp.s, "Contact: <", 10);
00183    tmp.s += 10;
00184    strncpy(tmp.s, subs->local_contact.s, subs->local_contact.len);
00185    tmp.s +=  subs->local_contact.len;
00186    if(subs->sockinfo_str.s!=NULL)
00187    {
00188       /* fix me */
00189       switch(subs->sockinfo_str.s[0]) {
00190          case 's':
00191          case 'S':
00192             strncpy(tmp.s, ";transport=sctp", 15);
00193             tmp.s += 15;
00194          break;
00195          case 't':
00196          case 'T':
00197             switch(subs->sockinfo_str.s[1]) {
00198                case 'c':
00199                case 'C':
00200                   strncpy(tmp.s, ";transport=tcp", 14);
00201                   tmp.s += 14;
00202                break;
00203                case 'l':
00204                case 'L':
00205                   strncpy(tmp.s, ";transport=tls", 14);
00206                   tmp.s += 14;
00207                break;
00208             }
00209          break;
00210       }
00211    }
00212    *tmp.s =  '>';
00213    tmp.s++;
00214    strncpy(tmp.s, CRLF, CRLF_LEN);
00215    tmp.s += CRLF_LEN;
00216    
00217    strncpy(tmp.s, "Subscription-State: ", 20);
00218    tmp.s += 20;
00219    strncpy(tmp.s, status.s, status.len);
00220    tmp.s += status.len;
00221    
00222    if(subs->status == TERMINATED_STATUS)
00223    {
00224       LM_DBG("state = terminated\n");
00225       
00226       strncpy(tmp.s, ";reason=", 8);
00227       tmp.s += 8;
00228       strncpy(tmp.s, subs->reason.s, subs->reason.len);
00229       tmp.s += subs->reason.len;
00230    } else { 
00231       strncpy(tmp.s, ";expires=", 9);
00232       tmp.s += 9;
00233       LM_DBG("expires = %d\n", subs->expires);
00234       strncpy(tmp.s, expires.s, expires.len);
00235       tmp.s += expires.len;
00236    }
00237    strncpy(tmp.s, CRLF, CRLF_LEN);
00238    tmp.s += CRLF_LEN;
00239    
00240    if(is_body)
00241    {  
00242       strncpy(tmp.s,"Content-Type: ", 14);
00243       tmp.s += 14;
00244       strncpy(tmp.s, event->content_type.s, event->content_type.len);
00245       tmp.s += event->content_type.len;
00246       strncpy(tmp.s, CRLF, CRLF_LEN);
00247       tmp.s += CRLF_LEN;
00248    }
00249    
00250    *tmp.s = '\0';
00251    hdr->len = tmp.s - hdr->s;
00252 
00253    return 0;
00254 }
00255 
00256 int get_wi_subs_db(subs_t* subs, watcher_t* watchers)
00257 {  
00258    subs_t sb;
00259    db_key_t query_cols[6];
00260    db_op_t  query_ops[6];
00261    db_val_t query_vals[6];
00262    db_key_t result_cols[6];
00263    db_res_t *result = NULL;
00264    db_row_t *row = NULL ;  
00265    db_val_t *row_vals = NULL;
00266    int n_result_cols = 0;
00267    int n_query_cols = 0;
00268    int i;
00269    int status_col, expires_col, from_user_col, from_domain_col, callid_col;
00270 
00271    query_cols[n_query_cols] = &str_presentity_uri_col;
00272    query_ops[n_query_cols] = OP_EQ;
00273    query_vals[n_query_cols].type = DB_STR;
00274    query_vals[n_query_cols].nul = 0;
00275    query_vals[n_query_cols].val.str_val= subs->pres_uri;
00276    n_query_cols++;
00277 
00278    query_cols[n_query_cols] = &str_event_col;
00279    query_ops[n_query_cols] = OP_EQ;
00280    query_vals[n_query_cols].type = DB_STR;
00281    query_vals[n_query_cols].nul = 0;
00282    query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
00283    n_query_cols++;
00284 
00285    result_cols[status_col=n_result_cols++] = &str_status_col;
00286    result_cols[expires_col=n_result_cols++] = &str_expires_col;
00287    result_cols[from_user_col=n_result_cols++] = &str_watcher_username_col;
00288    result_cols[from_domain_col=n_result_cols++] = &str_watcher_domain_col;
00289    result_cols[callid_col=n_result_cols++] = &str_callid_col;
00290 
00291    if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
00292    {
00293       LM_ERR("in use_table\n");
00294       goto error;
00295    }
00296 
00297    if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
00298        result_cols, n_query_cols, n_result_cols, 0,  &result) < 0) 
00299    {
00300       LM_ERR("querying active_watchers db table\n");
00301       goto error;
00302    }
00303 
00304    if(result== NULL )
00305    {
00306       goto error;
00307    }
00308 
00309    if(result->n <= 0)
00310    {
00311       LM_DBG("The query in db table for active subscription"
00312             " returned no result\n");
00313       pa_dbf.free_result(pa_db, result);
00314       return 0;
00315    }
00316    
00317    for(i=0; i<result->n; i++)
00318    {
00319       row = &result->rows[i];
00320       row_vals = ROW_VALUES(row);
00321       
00322       sb.from_user.s= (char*)row_vals[from_user_col].val.string_val;
00323       sb.from_user.len= strlen(sb.from_user.s);
00324 
00325       sb.from_domain.s= (char*)row_vals[from_domain_col].val.string_val;
00326       sb.from_domain.len= strlen(sb.from_domain.s);
00327 
00328       sb.callid.s= (char*)row_vals[callid_col].val.string_val;
00329       sb.callid.len= strlen(sb.callid.s);
00330 
00331       sb.event =subs->event->wipeer;
00332       sb.status= row_vals[status_col].val.int_val;
00333       
00334       if(add_watcher_list(&sb, watchers)<0)
00335          goto error;
00336 
00337    }
00338    
00339    pa_dbf.free_result(pa_db, result);
00340    return 0;
00341 
00342 error:
00343    if(result)
00344       pa_dbf.free_result(pa_db, result);
00345    return -1;
00346 }
00347 
00348 str* get_wi_notify_body(subs_t* subs, subs_t* watcher_subs)
00349 {
00350    str* notify_body = NULL;
00351    char* version_str;
00352    watcher_t *watchers = NULL;
00353    int len = 0;
00354    unsigned int hash_code;
00355    subs_t* s= NULL;
00356    int state = FULL_STATE_FLAG;
00357 
00358    hash_code = 0;
00359    version_str = int2str(subs->version, &len);
00360    if(version_str ==NULL)
00361    {
00362       LM_ERR("converting int to str\n ");
00363       goto error;
00364    }
00365 
00366    watchers= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00367    if(watchers== NULL)
00368    {
00369       ERR_MEM(PKG_MEM_STR);
00370    }
00371    memset(watchers, 0, sizeof(watcher_t));
00372 
00373    if(watcher_subs != NULL) 
00374    {     
00375       if(add_watcher_list(watcher_subs, watchers)< 0)
00376          goto error;
00377       state = PARTIAL_STATE_FLAG;
00378 
00379       goto done;
00380    }
00381 
00382    if(fallback2db)
00383    {
00384       if(get_wi_subs_db(subs, watchers)< 0)
00385       {
00386          LM_ERR("getting watchers from database\n");
00387          goto error;
00388       }
00389    }
00390 
00391    hash_code= core_hash(&subs->pres_uri, &subs->event->wipeer->name,
00392             shtable_size);
00393    lock_get(&subs_htable[hash_code].lock);
00394 
00395    s= subs_htable[hash_code].entries;
00396 
00397     while(s->next)
00398    {
00399       s= s->next;
00400 
00401       if(s->expires< (int)time(NULL))
00402       {  
00403          LM_DBG("expired record\n");
00404          continue;
00405       }
00406 
00407       if(fallback2db && s->db_flag!= INSERTDB_FLAG)
00408       {
00409          LM_DBG("record already found in database\n");
00410          continue;
00411       }
00412 
00413         if(s->event== subs->event->wipeer &&
00414          s->pres_uri.len== subs->pres_uri.len &&
00415          strncmp(s->pres_uri.s, subs->pres_uri.s,subs->pres_uri.len)== 0)
00416       {
00417          if(add_watcher_list(s, watchers)< 0)
00418          {
00419             lock_release(&subs_htable[hash_code].lock);
00420             goto error;
00421          }
00422       }
00423    }
00424    
00425    if(add_waiting_watchers(watchers, subs->pres_uri,
00426             subs->event->wipeer->name)< 0 )
00427    {
00428       LM_ERR("failed to add waiting watchers\n");
00429       goto error;
00430    }
00431 
00432 done:
00433    notify_body = create_winfo_xml(watchers,version_str,subs->pres_uri,
00434          subs->event->wipeer->name, state);
00435    
00436    if(watcher_subs == NULL) 
00437       lock_release(&subs_htable[hash_code].lock);
00438 
00439    if(notify_body== NULL)
00440    {
00441       LM_ERR("in function create_winfo_xml\n");
00442       goto error;
00443    }
00444    free_watcher_list(watchers);
00445    return notify_body;
00446 
00447 error:
00448    if(notify_body)
00449    {
00450       if(notify_body->s)
00451          xmlFree(notify_body->s);
00452       pkg_free(notify_body);
00453    }
00454    free_watcher_list(watchers);
00455    return NULL;
00456 }
00457 
00458 void free_watcher_list(watcher_t* watchers)
00459 {
00460    watcher_t* w;
00461    while(watchers)
00462    {  
00463       w= watchers;
00464       if(w->uri.s !=NULL)
00465          pkg_free(w->uri.s);
00466       if(w->id.s !=NULL)
00467          pkg_free(w->id.s);
00468       watchers= watchers->next;
00469       pkg_free(w);
00470    }
00471 }
00472 
00473 int add_watcher_list(subs_t *s, watcher_t *watchers)
00474 {
00475    watcher_t* w;
00476 
00477    w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00478    if(w== NULL)
00479    {
00480       LM_ERR("No more private memory\n");
00481       return -1;
00482    }
00483    w->status= s->status;
00484    if(uandd_to_uri(s->from_user, s->from_domain, &w->uri)<0)
00485    {
00486       LM_ERR("failed to create uri\n");
00487       goto error;
00488    }
00489    w->id.s = (char*)pkg_malloc(s->callid.len+ 1);
00490    if(w->id.s == NULL)
00491    {
00492       LM_ERR("no more memory\n");
00493       goto error;
00494    }
00495    memcpy(w->id.s, s->callid.s, s->callid.len);
00496    w->id.len = s->callid.len;
00497    w->id.s[w->id.len] = '\0';
00498 
00499    w->next= watchers->next;
00500    watchers->next= w;
00501 
00502    return 0;
00503 
00504 error:
00505    if(w)
00506    {
00507       if(w->uri.s)
00508          pkg_free(w->uri.s);
00509       pkg_free(w);
00510    }
00511    return -1;
00512 }
00513 
00514 str* build_empty_bla_body(str pres_uri)
00515 {
00516    xmlDocPtr doc;
00517    xmlNodePtr node;
00518    xmlAttrPtr attr;
00519    str* body= NULL;
00520    char* text;
00521    int len;
00522    char* entity= NULL;
00523 
00524    doc = xmlNewDoc(BAD_CAST "1.0");
00525    if(doc== NULL)
00526    {
00527       LM_ERR("failed to construct xml document\n");
00528       return NULL;
00529    }
00530 
00531    node = xmlNewNode(NULL, BAD_CAST "dialog-info");
00532    if(node== NULL)
00533    {
00534       LM_ERR("failed to initialize node\n");
00535       goto error;
00536    }
00537    xmlDocSetRootElement(doc, node);
00538 
00539    attr =  xmlNewProp(node, BAD_CAST "xmlns",BAD_CAST  "urn:ietf:params:xml:ns:dialog-info");
00540    if(attr== NULL)
00541    {
00542       LM_ERR("failed to initialize node attribute\n");
00543       goto error;
00544    }
00545    attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
00546    if(attr== NULL)
00547    {
00548       LM_ERR("failed to initialize node attribute\n");
00549       goto error;
00550    }
00551 
00552    attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
00553    if(attr== NULL)
00554    {
00555       LM_ERR("failed to initialize node attribute\n");
00556       goto error;
00557    }
00558 
00559    entity = (char*)pkg_malloc(pres_uri.len+1);
00560    if(entity== NULL)
00561    {
00562       LM_ERR("no more memory\n");
00563       goto error;
00564    }
00565    memcpy(entity, pres_uri.s, pres_uri.len);
00566    entity[pres_uri.len]= '\0';
00567 
00568    attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
00569    if(attr== NULL)
00570    {
00571       LM_ERR("failed to initialize node attribute\n");
00572       pkg_free(entity);
00573       goto error;
00574    }
00575    
00576    body = (str*) pkg_malloc(sizeof(str));
00577    if(body== NULL)
00578    {
00579       LM_ERR("no more private memory");
00580       pkg_free(entity);
00581       goto error;
00582    }
00583 
00584    xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&text, &len, 1);
00585    body->s = (char*) pkg_malloc(len);
00586    if(body->s == NULL)
00587    {
00588       LM_ERR("no more private memory");
00589       pkg_free(body);
00590       pkg_free(entity);
00591       goto error;
00592    }
00593    memcpy(body->s, text, len);
00594    body->len= len;
00595 
00596    
00597    pkg_free(entity);
00598    xmlFreeDoc(doc);
00599    xmlFree(text);
00600 
00601    return body;
00602 
00603 error:
00604    xmlFreeDoc(doc);
00605    return NULL;
00606 
00607 }
00608 
00609 str* get_p_notify_body(str pres_uri, pres_ev_t* event, str* etag,
00610       str* contact)
00611 {
00612    db_key_t query_cols[6];
00613    db_val_t query_vals[6];
00614    db_key_t result_cols[6];
00615    db_res_t *result = NULL;
00616    int body_col, expires_col, etag_col= 0, sender_col;
00617    str** body_array= NULL;
00618    str* notify_body= NULL; 
00619    db_row_t *row= NULL ;   
00620    db_val_t *row_vals;
00621    int n_result_cols = 0;
00622    int n_query_cols = 0;
00623    int i, n= 0, len;
00624    int build_off_n= -1; 
00625    str etags;
00626    str* body;
00627    int size= 0;
00628    struct sip_uri uri;
00629    unsigned int hash_code;
00630    str sender;
00631 
00632    if(parse_uri(pres_uri.s, pres_uri.len, &uri)< 0)
00633    {
00634       LM_ERR("while parsing uri\n");
00635       return NULL;
00636    }
00637 
00638    /* search in hash table if any record exists */
00639    hash_code= core_hash(&pres_uri, NULL, phtable_size);
00640    if(search_phtable(&pres_uri, event->evp->parsed, hash_code)== NULL)
00641    {
00642       LM_DBG("No record exists in hash_table\n");
00643       if(fallback2db)
00644          goto db_query;
00645 
00646       /* for pidf manipulation */
00647       if(event->agg_nbody)
00648       {
00649          notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00650          if(notify_body)
00651             goto done;
00652       }        
00653       return NULL;
00654    }
00655 
00656 db_query:
00657 
00658    query_cols[n_query_cols] = &str_domain_col;
00659    query_vals[n_query_cols].type = DB_STR;
00660    query_vals[n_query_cols].nul = 0;
00661    query_vals[n_query_cols].val.str_val = uri.host;
00662    n_query_cols++;
00663 
00664    query_cols[n_query_cols] = &str_username_col;
00665    query_vals[n_query_cols].type = DB_STR;
00666    query_vals[n_query_cols].nul = 0;
00667    query_vals[n_query_cols].val.str_val = uri.user;
00668    n_query_cols++;
00669 
00670    query_cols[n_query_cols] = &str_event_col;
00671    query_vals[n_query_cols].type = DB_STR;
00672    query_vals[n_query_cols].nul = 0;
00673    query_vals[n_query_cols].val.str_val= event->name;
00674    n_query_cols++;
00675 
00676    result_cols[body_col=n_result_cols++] = &str_body_col;
00677    result_cols[expires_col=n_result_cols++] = &str_expires_col;
00678    result_cols[etag_col=n_result_cols++] = &str_etag_col;
00679    result_cols[sender_col=n_result_cols++] = &str_sender_col;
00680    
00681    if (pa_dbf.use_table(pa_db, &presentity_table) < 0) 
00682    {
00683       LM_ERR("in use_table\n");
00684       return NULL;
00685    }
00686 
00687    static str query_str = str_init("received_time");
00688    if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
00689        result_cols, n_query_cols, n_result_cols, &query_str ,  &result) < 0) 
00690    {
00691       LM_ERR("failed to query %.*s table\n", presentity_table.len, presentity_table.s);
00692       if(result)
00693          pa_dbf.free_result(pa_db, result);
00694       return NULL;
00695    }
00696    
00697    if(result== NULL)
00698       return NULL;
00699 
00700    if (result->n<=0 )
00701    {
00702       LM_DBG("The query returned no result\n[username]= %.*s"
00703          "\t[domain]= %.*s\t[event]= %.*s\n",uri.user.len, uri.user.s,
00704          uri.host.len, uri.host.s, event->name.len, event->name.s);
00705       
00706       pa_dbf.free_result(pa_db, result);
00707       result= NULL;
00708 
00709       if(event->agg_nbody)
00710       {
00711          notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00712          if(notify_body)
00713             goto done;
00714       }        
00715       return NULL;
00716    }
00717    else
00718    {
00719       n= result->n;
00720       if(event->agg_nbody== NULL )
00721       {
00722          LM_DBG("Event does not require aggregation\n");
00723          row = &result->rows[n-1];
00724          row_vals = ROW_VALUES(row);
00725          
00726          /* if event BLA - check if sender is the same as contact */
00727          /* if so, send an empty dialog info document */
00728          if( event->evp->parsed == EVENT_DIALOG_SLA && contact )
00729          {
00730             sender.s = (char*)row_vals[sender_col].val.string_val;
00731             if(sender.s== NULL || strlen(sender.s)==0)
00732                goto after_sender_check;
00733             sender.len= strlen(sender.s);
00734          
00735             if(sender.len== contact->len &&
00736                   strncmp(sender.s, contact->s, sender.len)== 0)
00737             {
00738                notify_body= build_empty_bla_body(pres_uri);
00739                pa_dbf.free_result(pa_db, result);
00740                return notify_body;
00741             }
00742          }
00743 
00744 after_sender_check:
00745          if(row_vals[body_col].val.string_val== NULL)
00746          {
00747             LM_ERR("NULL notify body record\n");
00748             goto error;
00749          }
00750          len= strlen(row_vals[body_col].val.string_val);
00751          if(len== 0)
00752          {
00753             LM_ERR("Empty notify body record\n");
00754             goto error;
00755          }
00756          notify_body= (str*)pkg_malloc(sizeof(str));
00757          if(notify_body== NULL)
00758          {
00759             ERR_MEM(PKG_MEM_STR);   
00760          }
00761          memset(notify_body, 0, sizeof(str));
00762          notify_body->s= (char*)pkg_malloc( len* sizeof(char));
00763          if(notify_body->s== NULL)
00764          {
00765             pkg_free(notify_body);
00766             ERR_MEM(PKG_MEM_STR);
00767          }
00768          memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
00769          notify_body->len= len;
00770          pa_dbf.free_result(pa_db, result);
00771          
00772          return notify_body;
00773       }
00774       
00775       LM_DBG("Event requires aggregation\n");
00776       
00777       body_array =(str**)pkg_malloc( (n+2) *sizeof(str*));
00778       if(body_array == NULL)
00779       {
00780          ERR_MEM(PKG_MEM_STR);
00781       }
00782       memset(body_array, 0, (n+2) *sizeof(str*));
00783 
00784       if(etag!= NULL)
00785       {
00786          LM_DBG("searched etag = %.*s len= %d\n", 
00787                etag->len, etag->s, etag->len);
00788          LM_DBG("etag not NULL\n");
00789          for(i= 0; i< n; i++)
00790          {
00791             row = &result->rows[i];
00792             row_vals = ROW_VALUES(row);
00793             etags.s = (char*)row_vals[etag_col].val.string_val;
00794             etags.len = strlen(etags.s);
00795 
00796             LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
00797             if( (etags.len == etag->len) && (strncmp(etags.s,
00798                         etag->s,etags.len)==0 ) )
00799             {
00800                LM_DBG("found etag\n");
00801                build_off_n= i;
00802             }
00803             len= strlen((char*)row_vals[body_col].val.string_val);
00804             if(len== 0)
00805             {
00806                LM_ERR("Empty notify body record\n");
00807                goto error;
00808             }
00809          
00810             size= sizeof(str)+ len* sizeof(char);
00811             body= (str*)pkg_malloc(size);
00812             if(body== NULL)
00813             {
00814                ERR_MEM(PKG_MEM_STR);
00815             }
00816             memset(body, 0, size);
00817             size= sizeof(str);
00818             body->s= (char*)body+ size;
00819             memcpy(body->s, (char*)row_vals[body_col].val.string_val, len);
00820             body->len= len;
00821 
00822             body_array[i]= body;
00823          }
00824       }  
00825       else
00826       {  
00827          for(i=0; i< n; i++)
00828          {
00829             row = &result->rows[i];
00830             row_vals = ROW_VALUES(row);
00831             
00832             len= strlen((char*)row_vals[body_col].val.string_val);
00833             if(len== 0)
00834             {
00835                LM_ERR("Empty notify body record\n");
00836                goto error;
00837             }
00838             
00839             size= sizeof(str)+ len* sizeof(char);
00840             body= (str*)pkg_malloc(size);
00841             if(body== NULL)
00842             {
00843                ERR_MEM(PKG_MEM_STR);
00844             }
00845             memset(body, 0, size);
00846             size= sizeof(str);
00847             body->s= (char*)body+ size;
00848             memcpy(body->s, row_vals[body_col].val.string_val, len);
00849             body->len= len;
00850 
00851             body_array[i]= body;
00852          }        
00853       }
00854       pa_dbf.free_result(pa_db, result);
00855       result= NULL;
00856       
00857       notify_body = event->agg_nbody(&uri.user, &uri.host, body_array, n, build_off_n);
00858    }
00859 
00860 done: 
00861    if(body_array!=NULL)
00862    {
00863       for(i= 0; i< n; i++)
00864       {
00865          if(body_array[i])
00866             pkg_free(body_array[i]);
00867       }
00868       pkg_free(body_array);
00869    }
00870    return notify_body;
00871 
00872 error:
00873    if(result!=NULL)
00874       pa_dbf.free_result(pa_db, result);
00875 
00876    if(body_array!=NULL)
00877    {
00878       for(i= 0; i< n; i++)
00879       {
00880          if(body_array[i])
00881             pkg_free(body_array[i]);
00882          else
00883             break;
00884 
00885       }
00886    
00887       pkg_free(body_array);
00888    }
00889    return NULL;
00890 }
00891 
00892 int free_tm_dlg(dlg_t *td)
00893 {
00894    if(td)
00895    {
00896       if(td->loc_uri.s)
00897          pkg_free(td->loc_uri.s);
00898       if(td->rem_uri.s)
00899          pkg_free(td->rem_uri.s);
00900 
00901       if(td->route_set)
00902          free_rr(&td->route_set);
00903       pkg_free(td);
00904    }
00905    return 0;
00906 }
00907 
00908 dlg_t* build_dlg_t(subs_t* subs)
00909 {
00910    dlg_t* td =NULL;
00911    int found_contact = 1;
00912 
00913    td = (dlg_t*)pkg_malloc(sizeof(dlg_t));
00914    if(td == NULL)
00915    {
00916       ERR_MEM(PKG_MEM_STR);
00917    }
00918    memset(td, 0, sizeof(dlg_t));
00919 
00920    td->loc_seq.value = subs->local_cseq;
00921    td->loc_seq.is_set = 1;
00922 
00923    td->id.call_id = subs->callid;
00924    td->id.rem_tag = subs->from_tag;
00925    td->id.loc_tag =subs->to_tag;
00926    
00927    uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
00928    if(td->loc_uri.s== NULL)
00929    {
00930       LM_ERR("while creating uri\n");
00931       goto error;
00932    }
00933 
00934    if(subs->contact.len ==0 || subs->contact.s == NULL )
00935    {
00936       found_contact = 0;
00937    }
00938    else
00939    {
00940       LM_DBG("CONTACT = %.*s\n", subs->contact.len , subs->contact.s);
00941       td->rem_target = subs->contact;
00942    }
00943 
00944    uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
00945    if(td->rem_uri.s ==NULL)
00946    {
00947       LM_ERR("while creating uri\n");
00948       goto error;
00949    }
00950    
00951    if(found_contact == 0)
00952    {
00953       td->rem_target = td->rem_uri;
00954    }
00955    if(subs->record_route.s && subs->record_route.len)
00956    {
00957       if(parse_rr_body(subs->record_route.s, subs->record_route.len,
00958          &td->route_set)< 0)
00959       {
00960          LM_ERR("in function parse_rr_body\n");
00961          goto error;
00962       }
00963    }  
00964    td->state= DLG_CONFIRMED ;
00965 
00966    if (subs->sockinfo_str.len) {
00967       int port, proto;
00968         str host;
00969       if (parse_phostport (
00970             subs->sockinfo_str.s,subs->sockinfo_str.len,&host.s,
00971             &host.len,&port, &proto )) {
00972          LM_ERR("bad sockinfo string\n");
00973          goto error;
00974       }
00975       td->send_sock = grep_sock_info (
00976          &host, (unsigned short) port, (unsigned short) proto);
00977    }
00978    
00979    return td;
00980 
00981 error:      
00982    free_tm_dlg(td);
00983    return NULL;
00984 }
00985 
00986 int get_subs_db(str* pres_uri, pres_ev_t* event, str* sender,
00987       subs_t** s_array, int* n)
00988 {
00989    db_key_t query_cols[7];
00990    db_op_t  query_ops[7];
00991    db_val_t query_vals[7];
00992    db_key_t result_cols[19];
00993    int n_result_cols = 0, n_query_cols = 0;
00994    db_row_t *row ;   
00995    db_val_t *row_vals ;
00996    db_res_t *result = NULL;
00997    int from_user_col, from_domain_col, from_tag_col;
00998    int to_user_col, to_domain_col, to_tag_col;
00999    int expires_col= 0,callid_col, cseq_col, i, reason_col;
01000    int version_col= 0, record_route_col = 0, contact_col = 0;
01001    int sockinfo_col= 0, local_contact_col= 0, event_id_col = 0;
01002    subs_t s, *s_new;
01003    int inc= 0;
01004       
01005    if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
01006    {
01007       LM_ERR("in use_table\n");
01008       return -1;
01009    }
01010 
01011    LM_DBG("querying database table = active_watchers\n");
01012    query_cols[n_query_cols] = &str_presentity_uri_col;
01013    query_ops[n_query_cols] = OP_EQ;
01014    query_vals[n_query_cols].type = DB_STR;
01015    query_vals[n_query_cols].nul = 0;
01016    query_vals[n_query_cols].val.str_val = *pres_uri;
01017    n_query_cols++;
01018    
01019    query_cols[n_query_cols] = &str_event_col;
01020    query_ops[n_query_cols] = OP_EQ;
01021    query_vals[n_query_cols].type = DB_STR;
01022    query_vals[n_query_cols].nul = 0;
01023    query_vals[n_query_cols].val.str_val = event->name;
01024    n_query_cols++;
01025 
01026    query_cols[n_query_cols] = &str_status_col;
01027    query_ops[n_query_cols] = OP_EQ;
01028    query_vals[n_query_cols].type = DB_INT;
01029    query_vals[n_query_cols].nul = 0;
01030    query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
01031    n_query_cols++;
01032 
01033    query_cols[n_query_cols] = &str_contact_col;
01034    query_ops[n_query_cols] = OP_NEQ;
01035    query_vals[n_query_cols].type = DB_STR;
01036    query_vals[n_query_cols].nul = 0;
01037    if(sender)
01038    {  
01039       LM_DBG("Do not send Notify to:[uri]= %.*s\n",sender->len,sender->s);
01040       query_vals[n_query_cols].val.str_val = *sender;
01041    } else {
01042       query_vals[n_query_cols].val.str_val.s = "";
01043       query_vals[n_query_cols].val.str_val.len = 0;
01044    }
01045    n_query_cols++;
01046 
01047    result_cols[to_user_col=n_result_cols++]      =   &str_to_user_col;
01048    result_cols[to_domain_col=n_result_cols++]    =   &str_to_domain_col;
01049    result_cols[from_user_col=n_result_cols++]    =   &str_watcher_username_col;
01050    result_cols[from_domain_col=n_result_cols++]  =   &str_watcher_domain_col;
01051    result_cols[event_id_col=n_result_cols++]     =   &str_event_id_col;
01052    result_cols[from_tag_col=n_result_cols++]     =   &str_from_tag_col;
01053    result_cols[to_tag_col=n_result_cols++]       =   &str_to_tag_col;
01054    result_cols[callid_col=n_result_cols++]       =   &str_callid_col;
01055    result_cols[cseq_col=n_result_cols++]         =   &str_local_cseq_col;
01056    result_cols[record_route_col=n_result_cols++] =   &str_record_route_col;
01057    result_cols[contact_col=n_result_cols++]      =   &str_contact_col;
01058    result_cols[expires_col=n_result_cols++]      =   &str_expires_col;
01059    result_cols[reason_col=n_result_cols++]       =   &str_reason_col;
01060    result_cols[sockinfo_col=n_result_cols++]     =   &str_socket_info_col;
01061    result_cols[local_contact_col=n_result_cols++]=   &str_local_contact_col;
01062    result_cols[version_col=n_result_cols++]      =   &str_version_col;
01063 
01064    if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,result_cols,
01065             n_query_cols, n_result_cols, 0, &result) < 0) 
01066    {
01067       LM_ERR("while querying database\n");
01068       if(result)
01069       {
01070          pa_dbf.free_result(pa_db, result);
01071       }
01072       return -1;
01073    }
01074 
01075    if(result== NULL)
01076       return -1;
01077 
01078    if(result->n <=0 )
01079    {
01080       LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
01081          " returned no result\n",pres_uri->len, pres_uri->s,
01082          event->name.len, event->name.s);
01083       pa_dbf.free_result(pa_db, result);
01084       return 0;
01085    }
01086    LM_DBG("found %d dialogs\n", result->n);
01087    
01088    for(i=0; i<result->n; i++)
01089    {
01090       row = &result->rows[i];
01091       row_vals = ROW_VALUES(row);   
01092       
01093    // if(row_vals[expires_col].val.int_val< (int)time(NULL))
01094    //    continue;
01095 
01096         if(row_vals[reason_col].val.string_val)
01097         {
01098             if(strlen(row_vals[reason_col].val.string_val) != 0)
01099                 continue;
01100         }
01101       // s.reason.len= strlen(s.reason.s);
01102 
01103       memset(&s, 0, sizeof(subs_t));
01104       s.status= ACTIVE_STATUS;
01105       
01106       s.pres_uri= *pres_uri;
01107       s.to_user.s= (char*)row_vals[to_user_col].val.string_val;
01108       s.to_user.len= strlen(s.to_user.s);
01109       
01110       s.to_domain.s= (char*)row_vals[to_domain_col].val.string_val;
01111       s.to_domain.len= strlen(s.to_domain.s);
01112       
01113       s.from_user.s= (char*)row_vals[from_user_col].val.string_val;
01114       s.from_user.len= strlen(s.from_user.s);
01115       
01116       s.from_domain.s= (char*)row_vals[from_domain_col].val.string_val;
01117       s.from_domain.len= strlen(s.from_domain.s);
01118       
01119       s.event_id.s=(char*)row_vals[event_id_col].val.string_val;
01120       s.event_id.len= (s.event_id.s)?strlen(s.event_id.s):0;
01121       
01122       s.to_tag.s= (char*)row_vals[to_tag_col].val.string_val;
01123       s.to_tag.len= strlen(s.to_tag.s);
01124       
01125       s.from_tag.s= (char*)row_vals[from_tag_col].val.string_val; 
01126       s.from_tag.len= strlen(s.from_tag.s);
01127       
01128       s.callid.s= (char*)row_vals[callid_col].val.string_val;
01129       s.callid.len= strlen(s.callid.s);
01130       
01131       s.record_route.s=  (char*)row_vals[record_route_col].val.string_val;
01132       s.record_route.len= (s.record_route.s)?strlen(s.record_route.s):0;
01133 
01134       s.contact.s= (char*)row_vals[contact_col].val.string_val;
01135       s.contact.len= strlen(s.contact.s);
01136       
01137       s.sockinfo_str.s = (char*)row_vals[sockinfo_col].val.string_val;
01138       s.sockinfo_str.len = s.sockinfo_str.s?strlen(s.sockinfo_str.s):0;
01139 
01140       s.local_contact.s = (char*)row_vals[local_contact_col].val.string_val;
01141       s.local_contact.len = s.local_contact.s?strlen(s.local_contact.s):0;
01142       
01143       s.event= event;
01144       s.local_cseq = row_vals[cseq_col].val.int_val;
01145       s.expires = row_vals[expires_col].val.int_val -(int)time(NULL);
01146       s.version = row_vals[version_col].val.int_val;
01147 
01148       s_new= mem_copy_subs(&s, PKG_MEM_TYPE);
01149       if(s_new== NULL)
01150       {
01151          LM_ERR("while copying subs_t structure\n");
01152          goto error;
01153       }
01154       s_new->next= (*s_array);
01155       (*s_array)= s_new;
01156       printf_subs(s_new);
01157       inc++;
01158       
01159    }
01160    pa_dbf.free_result(pa_db, result);
01161    *n= inc;
01162 
01163    return 0;
01164 
01165 error:
01166    if(result)
01167       pa_dbf.free_result(pa_db, result);
01168    
01169    return -1;
01170 }
01171 
01172 int update_in_list(subs_t* s, subs_t* s_array, int new_rec_no, int n)
01173 {
01174    int i= 0;
01175    subs_t* ls;
01176 
01177    ls= s_array;
01178    
01179    while(i< new_rec_no)
01180    {
01181       i++;
01182       ls= ls->next;
01183    }
01184 
01185    for(i = 0; i< n; i++)
01186    {
01187       if(ls== NULL)
01188       {
01189          LM_ERR("wrong records count\n");
01190          return -1;
01191       }
01192       printf_subs(ls);
01193       
01194       if(ls->callid.len== s->callid.len &&
01195       strncmp(ls->callid.s, s->callid.s, s->callid.len)== 0 &&
01196       ls->to_tag.len== s->to_tag.len &&
01197       strncmp(ls->to_tag.s, s->to_tag.s, s->to_tag.len)== 0 &&
01198       ls->from_tag.len== s->from_tag.len &&
01199       strncmp(ls->from_tag.s, s->from_tag.s, s->from_tag.len)== 0 )
01200       {
01201          ls->local_cseq= s->local_cseq;
01202          ls->expires= s->expires- (int)time(NULL);
01203          ls->version= s->version;
01204          ls->status= s->status;
01205          return 1;
01206       }
01207       ls= ls->next;
01208    }
01209    return -1;
01210 }
01211 
01212 subs_t* get_subs_dialog(str* pres_uri, pres_ev_t* event, str* sender)
01213 {
01214    unsigned int hash_code;
01215    subs_t* s= NULL, *s_new;
01216    subs_t* s_array= NULL;
01217    int n= 0, i= 0;
01218    
01219    /* if fallback2db -> should take all dialogs from db
01220     * and the only those dialogs from cache with db_flag= INSERTDB_FLAG */
01221 
01222    if(fallback2db)
01223    {
01224       if(get_subs_db(pres_uri, event, sender, &s_array, &n)< 0)         
01225       {
01226          LM_ERR("getting dialogs from database\n");
01227          goto error;
01228       }
01229    }
01230    hash_code= core_hash(pres_uri, &event->name, shtable_size);
01231    
01232    lock_get(&subs_htable[hash_code].lock);
01233 
01234    s= subs_htable[hash_code].entries;
01235 
01236    while(s->next)
01237    {
01238       s= s->next;
01239    
01240       printf_subs(s);
01241       
01242       if(s->expires< (int)time(NULL))
01243       {
01244          LM_DBG("expired subs\n");
01245          continue;
01246       }
01247       
01248       if((!(s->status== ACTIVE_STATUS &&
01249             s->reason.len== 0 &&
01250          s->event== event && s->pres_uri.len== pres_uri->len &&
01251          strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0)) || 
01252          (sender && sender->len== s->contact.len && 
01253          strncmp(sender->s, s->contact.s, sender->len)== 0))
01254          continue;
01255 
01256       if(fallback2db)
01257       {
01258          if(s->db_flag== NO_UPDATEDB_FLAG)
01259          {
01260             LM_DBG("s->db_flag==NO_UPDATEDB_FLAG\n");
01261             continue;
01262          }
01263          
01264          if(s->db_flag== UPDATEDB_FLAG)
01265          {
01266             LM_DBG("s->db_flag== UPDATEDB_FLAG\n");
01267             if(n>0 && update_in_list(s, s_array, i, n)< 0)
01268             {
01269                LM_DBG("dialog not found in list fetched from database\n");
01270                /* insert record */
01271             }
01272             else
01273                continue;         
01274          }
01275       }
01276       
01277       LM_DBG("s->db_flag= INSERTDB_FLAG\n");
01278       s_new= mem_copy_subs(s, PKG_MEM_TYPE);
01279       if(s_new== NULL)
01280       {
01281          LM_ERR("copying subs_t structure\n");
01282          lock_release(&subs_htable[hash_code].lock);
01283          goto error;
01284       }
01285       s_new->expires-= (int)time(NULL);
01286       s_new->next= s_array;
01287       s_array= s_new;
01288       i++;
01289    }
01290 
01291    lock_release(&subs_htable[hash_code].lock);
01292    LM_DBG("found %d dialogs( %d in database and %d in hash_table)\n",n+i,n,i);
01293 
01294    return s_array;
01295 
01296 error:
01297    free_subs_list(s_array, PKG_MEM_TYPE, 0);
01298    return NULL;
01299    
01300 }
01301 
01302 int publ_notify(presentity_t* p, str pres_uri, str* body, str* offline_etag, str* rules_doc)
01303 {
01304    str *notify_body = NULL, *aux_body = NULL;
01305    subs_t* subs_array= NULL, *s= NULL;
01306    int ret_code= -1;
01307 
01308    subs_array= get_subs_dialog(&pres_uri, p->event , p->sender);
01309    if(subs_array == NULL)
01310    {
01311       LM_DBG("Could not find subs_dialog\n");
01312       ret_code= 0;
01313       goto done;
01314    }
01315 
01316    /* if the event does not require aggregation - we have the final body */
01317    if(p->event->agg_nbody)
01318    {  
01319       notify_body = get_p_notify_body(pres_uri, p->event , offline_etag, NULL);
01320       if(notify_body == NULL)
01321       {
01322          LM_DBG("Could not get the notify_body\n");
01323          /* goto error; */
01324       }
01325    }
01326 
01327    s= subs_array;
01328    while(s)
01329    {
01330       s->auth_rules_doc= rules_doc;
01331       if (p->event->aux_body_processing) {
01332          aux_body = p->event->aux_body_processing(s, notify_body?notify_body:body);
01333       }
01334 
01335       if(notify(s, NULL, aux_body?aux_body:(notify_body?notify_body:body), 0)< 0 )
01336       {
01337          LM_ERR("Could not send notify for %.*s\n",
01338                p->event->name.len, p->event->name.s);
01339       }
01340 
01341       if(aux_body!=NULL) {
01342          if(aux_body->s)   {
01343             p->event->aux_free_body(aux_body->s);
01344          }
01345          pkg_free(aux_body);
01346       }
01347       s= s->next;
01348    }
01349    ret_code= 0;
01350 
01351 done:
01352    free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01353    
01354    if(notify_body!=NULL)
01355    {
01356       if(notify_body->s)
01357       {
01358          if(   p->event->agg_nbody== NULL && p->event->apply_auth_nbody== NULL)
01359             pkg_free(notify_body->s);
01360          else
01361             p->event->free_body(notify_body->s);
01362       }
01363       pkg_free(notify_body);
01364    }
01365    return ret_code;
01366 }  
01367 
01368 int query_db_notify(str* pres_uri, pres_ev_t* event, subs_t* watcher_subs )
01369 {
01370    subs_t* subs_array = NULL, *s= NULL;
01371    str* notify_body = NULL, *aux_body = NULL;
01372    int ret_code= -1;
01373 
01374    subs_array= get_subs_dialog(pres_uri, event , NULL);
01375    if(subs_array == NULL)
01376    {
01377       LM_DBG("Could not get subscription dialog\n");
01378       ret_code= 1;
01379       goto done;
01380    }
01381    
01382    if(event->type & PUBL_TYPE)
01383    {
01384       notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
01385       if(notify_body == NULL)
01386       {
01387          LM_DBG("Could not get the notify_body\n");
01388          /* goto error; */
01389       }
01390    }  
01391 
01392    s= subs_array;
01393    
01394    while(s)
01395    {
01396 
01397       if (event->aux_body_processing) {
01398          aux_body = event->aux_body_processing(s, notify_body);
01399       }
01400 
01401       if(notify(s, watcher_subs, aux_body?aux_body:notify_body, 0)< 0 )
01402       {
01403          LM_ERR("Could not send notify for [event]=%.*s\n",
01404                event->name.len, event->name.s);
01405          goto done;
01406       }
01407 
01408       if(aux_body!=NULL) {
01409          if(aux_body->s)   {
01410             event->aux_free_body(aux_body->s);
01411          }
01412          pkg_free(aux_body);
01413       }
01414       s= s->next;
01415    }
01416 
01417    ret_code= 1;
01418 
01419 done:
01420    free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01421    if(notify_body!=NULL)
01422    {
01423       if(notify_body->s)
01424       {
01425          if(event->type & WINFO_TYPE)
01426             pkg_free(notify_body->s);
01427          else
01428          if(event->agg_nbody== NULL && event->apply_auth_nbody== NULL)
01429             pkg_free(notify_body->s);
01430          else
01431             event->free_body(notify_body->s);
01432       }
01433       pkg_free(notify_body);
01434    }
01435 
01436    return ret_code;
01437 }
01438 
01439 int send_notify_request(subs_t* subs, subs_t * watcher_subs,
01440       str* n_body,int force_null_body)
01441 {
01442    dlg_t* td = NULL;
01443    str met = {"NOTIFY", 6};
01444    str str_hdr = {0, 0};
01445    str* notify_body = NULL;
01446    int result= 0;
01447     c_back_param *cb_param= NULL;
01448    str* final_body= NULL;
01449    
01450    LM_DBG("dialog info:\n");
01451    printf_subs(subs);
01452 
01453     /* getting the status of the subscription */
01454 
01455    if(force_null_body)
01456    {
01457       goto jump_over_body;
01458    }
01459 
01460    if(n_body!= NULL && subs->status== ACTIVE_STATUS)
01461    {
01462       if( subs->event->req_auth)
01463       {
01464          
01465          if(subs->auth_rules_doc && subs->event->apply_auth_nbody)
01466          {
01467             if(subs->event->apply_auth_nbody(n_body, subs, &notify_body)< 0)
01468             {
01469                LM_ERR("in function apply_auth_nbody\n");
01470                goto error;
01471             }
01472          }
01473          if(notify_body== NULL)
01474             notify_body= n_body;
01475       }
01476       else
01477          notify_body= n_body;
01478    }  
01479    else
01480    {  
01481       if(subs->status== TERMINATED_STATUS || 
01482             subs->status== PENDING_STATUS) 
01483       {
01484          LM_DBG("state terminated or pending- notify body NULL\n");
01485          notify_body = NULL;
01486       }
01487       else  
01488       {     
01489          if(subs->event->type & WINFO_TYPE)  
01490          {  
01491             notify_body = get_wi_notify_body(subs, watcher_subs);
01492             if(notify_body == NULL)
01493             {
01494                LM_DBG("Could not get notify_body\n");
01495                goto error;
01496             }
01497          }
01498          else
01499          {
01500             notify_body = get_p_notify_body(subs->pres_uri,
01501                subs->event, NULL, (subs->contact.s)?&subs->contact:NULL);
01502             if(notify_body == NULL || notify_body->s== NULL)
01503             {
01504                LM_DBG("Could not get the notify_body\n");
01505             }
01506             else     /* apply authorization rules if exists */
01507             if(subs->event->req_auth)
01508             {
01509                 
01510                if(subs->auth_rules_doc && subs->event->apply_auth_nbody
01511                      && subs->event->apply_auth_nbody(notify_body,
01512                         subs,&final_body)<0)
01513                {
01514                   LM_ERR("in function apply_auth\n");
01515                   goto error;
01516                }
01517                if(final_body)
01518                {
01519                   xmlFree(notify_body->s);
01520                   pkg_free(notify_body);
01521                   notify_body= final_body;
01522                }
01523             }
01524          }
01525       }
01526    }
01527    
01528 jump_over_body:
01529 
01530    if(subs->expires<= 0)
01531    {
01532         subs->expires= 0;
01533       subs->status= TERMINATED_STATUS;
01534       subs->reason.s= "timeout";
01535       subs->reason.len= 7;
01536    }
01537 
01538    /* build extra headers */
01539    if( build_str_hdr( subs, notify_body?1:0, &str_hdr)< 0 )
01540    {
01541       LM_ERR("while building headers\n");
01542       goto error;
01543    }  
01544    LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
01545 
01546    /* construct the dlg_t structure */
01547    td = build_dlg_t(subs);
01548    if(td ==NULL)
01549    {
01550       LM_ERR("while building dlg_t structure\n");
01551       goto error; 
01552    }
01553 
01554    cb_param = shm_dup_cbparam(subs);
01555    if(cb_param == NULL)
01556    {
01557       LM_ERR("while duplicating cb_param in share memory\n");
01558       goto error; 
01559    }  
01560 
01561    result = tmb.t_request_within
01562       (&met,              /* method*/
01563       &str_hdr,           /* extra headers*/
01564       notify_body,        /* body*/
01565       td,                 /* dialog structure*/
01566       p_tm_callback,      /* callback function*/
01567       (void*)cb_param);   /* callback parameter*/
01568 
01569    if(result< 0)
01570    {
01571       LM_ERR("in function tmb.t_request_within\n");
01572       free_cbparam(cb_param);
01573       goto error; 
01574    }
01575 
01576    LM_INFO("NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s\n",
01577       td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
01578       td->hooks.next_hop->s,
01579       td->loc_uri.len, td->loc_uri.s, subs->event->name.len,
01580       subs->event->name.s);
01581 
01582    free_tm_dlg(td);
01583    
01584    if(str_hdr.s) pkg_free(str_hdr.s);
01585    
01586    if((int)(long)n_body!= (int)(long)notify_body)
01587    {
01588       if(notify_body!=NULL)
01589       {
01590          if(notify_body->s!=NULL)
01591          {
01592             if(subs->event->type& WINFO_TYPE )
01593                xmlFree(notify_body->s);
01594             else
01595             if(subs->event->apply_auth_nbody== NULL
01596                   && subs->event->agg_nbody== NULL)
01597                pkg_free(notify_body->s);
01598             else
01599             subs->event->free_body(notify_body->s);
01600          }
01601          pkg_free(notify_body);
01602       }
01603    }  
01604    return 0;
01605 
01606 error:
01607    free_tm_dlg(td);
01608    if(str_hdr.s!=NULL)
01609       pkg_free(str_hdr.s);
01610    if((int)(long)n_body!= (int)(long)notify_body)
01611    {
01612       if(notify_body!=NULL)
01613       {
01614          if(notify_body->s!=NULL)
01615          {
01616             if(subs->event->type& WINFO_TYPE)
01617                xmlFree(notify_body->s);
01618             else
01619             if(subs->event->apply_auth_nbody== NULL
01620                   && subs->event->agg_nbody== NULL)
01621                pkg_free(notify_body->s);
01622             else
01623             subs->event->free_body(notify_body->s);
01624          }
01625          pkg_free(notify_body);
01626       }
01627    }  
01628    return -1;
01629 }
01630 
01631 
01632 int notify(subs_t* subs, subs_t * watcher_subs,str* n_body,int force_null_body)
01633 {
01634    /* update first in hash table and the send Notify */
01635    if(subs->expires!= 0 && subs->status != TERMINATED_STATUS)
01636    {
01637       unsigned int hash_code;
01638       hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
01639 
01640       if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE)< 0)
01641       {
01642          if(subs->db_flag!= INSERTDB_FLAG && fallback2db)
01643          {
01644             LM_DBG("record not found in subs htable\n");
01645             if(update_subs_db(subs, LOCAL_TYPE)< 0)
01646             {
01647                LM_ERR("updating subscription in database\n");
01648                return -1;
01649             }
01650          }
01651          else
01652          {
01653             LM_ERR("record not found in subs htable\n");
01654             return -1;
01655          }
01656       }
01657    }
01658      
01659     if(subs->reason.s && subs->status== ACTIVE_STATUS && 
01660         subs->reason.len== 12 && strncmp(subs->reason.s, "polite-block", 12)== 0)
01661     {
01662         force_null_body = 1;
01663     }
01664 
01665    if(send_notify_request(subs, watcher_subs, n_body, force_null_body)< 0)
01666    {
01667       LM_ERR("sending Notify not successful\n");
01668       return -1;
01669    }
01670    return 0;
01671 }
01672 
01673 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps)
01674 {
01675    if(ps->param==NULL || *ps->param==NULL || 
01676          ((c_back_param*)(*ps->param))->pres_uri.s == NULL || 
01677          ((c_back_param*)(*ps->param))->ev_name.s== NULL ||
01678          ((c_back_param*)(*ps->param))->to_tag.s== NULL)
01679    {
01680       LM_DBG("message id not received\n");
01681       if(*ps->param !=NULL  )
01682          free_cbparam((c_back_param*)(*ps->param));
01683       return;
01684    }
01685    
01686    LM_DBG("completed with status %d [to_tag:%.*s]\n",
01687          ps->code,((c_back_param*)(*ps->param))->to_tag.len,
01688          ((c_back_param*)(*ps->param))->to_tag.s);
01689 
01690    if(ps->code >= 300)
01691    {
01692       unsigned int hash_code;
01693 
01694       c_back_param*  cb= (c_back_param*)(*ps->param);
01695 
01696       hash_code= core_hash(&cb->pres_uri, &cb->ev_name, shtable_size);
01697       delete_shtable(subs_htable, hash_code, cb->to_tag);
01698 
01699       delete_db_subs(cb->pres_uri, cb->ev_name, cb->to_tag);
01700    }  
01701 
01702    if(*ps->param !=NULL  )
01703       free_cbparam((c_back_param*)(*ps->param));
01704    return ;
01705 }
01706 
01707 void free_cbparam(c_back_param* cb_param)
01708 {
01709    if(cb_param!= NULL)
01710       shm_free(cb_param);
01711 }
01712 
01713 c_back_param* shm_dup_cbparam(subs_t* subs)
01714 {
01715    int size;
01716    c_back_param* cb_param = NULL;
01717    
01718    size = sizeof(c_back_param) + subs->pres_uri.len+
01719          subs->event->name.len + subs->to_tag.len;
01720 
01721    cb_param= (c_back_param*)shm_malloc(size);
01722    LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len,
01723          subs->event->name.len, subs->to_tag.len);
01724    if(cb_param==NULL)
01725    {
01726       LM_ERR("no more shared memory\n");
01727       return NULL;
01728    }
01729    memset(cb_param, 0, size);
01730 
01731    cb_param->pres_uri.s = (char*)cb_param + sizeof(c_back_param);
01732    memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
01733    cb_param->pres_uri.len = subs->pres_uri.len;
01734    cb_param->ev_name.s = (char*)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
01735    memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
01736    cb_param->ev_name.len = subs->event->name.len;
01737    cb_param->to_tag.s = (char*)(cb_param->ev_name.s) + cb_param->ev_name.len;
01738    memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
01739    cb_param->to_tag.len = subs->to_tag.len;
01740 
01741    return cb_param;
01742 }
01743 
01744 
01745 str* create_winfo_xml(watcher_t* watchers, char* version,
01746       str resource, str event, int STATE_FLAG)
01747 {
01748    xmlDocPtr doc = NULL;       
01749     xmlNodePtr root_node = NULL, node = NULL;
01750    xmlNodePtr w_list_node = NULL;   
01751    char content[200];
01752    str *body= NULL;
01753    char* res= NULL;
01754    watcher_t* w;
01755 
01756     LIBXML_TEST_VERSION;
01757     
01758    doc = xmlNewDoc(BAD_CAST "1.0");
01759     root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
01760     xmlDocSetRootElement(doc, root_node);
01761 
01762     xmlNewProp(root_node, BAD_CAST "xmlns",
01763          BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
01764     xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version );
01765    
01766    if(STATE_FLAG & FULL_STATE_FLAG)
01767    {
01768       if( xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL)
01769       {
01770          LM_ERR("while adding new attribute\n");
01771          goto error;
01772       }
01773    }
01774    else  
01775    {  
01776       if( xmlNewProp(root_node, BAD_CAST "state", 
01777                BAD_CAST "partial")== NULL) 
01778       {
01779          LM_ERR("while adding new attribute\n");
01780          goto error;
01781       }
01782    }
01783 
01784    w_list_node =xmlNewChild(root_node, NULL, BAD_CAST "watcher-list",NULL);
01785    if( w_list_node == NULL)
01786    {
01787       LM_ERR("while adding child\n");
01788       goto error;
01789    }
01790    res= (char*)pkg_malloc((resource.len>event.len)?resource.len:event.len
01791          + 1);
01792    if(res== NULL)
01793    {
01794       ERR_MEM(PKG_MEM_STR);
01795    }
01796    memcpy(res, resource.s, resource.len);
01797    res[resource.len]= '\0';
01798    xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
01799    memcpy(res, event.s, event.len);
01800    res[event.len]= '\0';
01801    xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
01802    pkg_free(res);
01803 
01804 
01805    w= watchers->next;
01806    while(w)
01807    {
01808       strncpy( content,w->uri.s, w->uri.len);
01809       content[ w->uri.len ]='\0';
01810       node = xmlNewChild(w_list_node, NULL, BAD_CAST "watcher",
01811             BAD_CAST content) ;
01812       if( node ==NULL)
01813       {
01814          LM_ERR("while adding child\n");
01815          goto error;
01816       }
01817       if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s)== NULL)
01818       {
01819          LM_ERR("while adding new attribute\n");
01820          goto error;
01821       }  
01822       
01823       if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe")== NULL)
01824       {
01825          LM_ERR("while adding new attribute\n");
01826          goto error;
01827       }  
01828       
01829       if(xmlNewProp(node, BAD_CAST "status", 
01830                BAD_CAST get_status_str(w->status))== NULL)
01831       {
01832          LM_ERR("while adding new attribute\n");
01833          goto error;
01834       }
01835       w= w->next;
01836    }
01837     body = (str*)pkg_malloc(sizeof(str));
01838    if(body == NULL)
01839    {
01840       ERR_MEM(PKG_MEM_STR);   
01841    }
01842    memset(body, 0, sizeof(str));
01843 
01844    xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&body->s, &body->len, 1);
01845 
01846    xmlFreeDoc(doc);
01847 
01848    xmlCleanupParser();
01849 
01850     xmlMemoryDump();
01851 
01852     return body;
01853 
01854 error:
01855     if(doc)
01856       xmlFreeDoc(doc);
01857    return NULL;
01858 }
01859 
01860 int watcher_found_in_list(watcher_t * watchers, str wuri)
01861 {
01862    watcher_t * w;
01863 
01864    w = watchers->next;
01865 
01866    while(w)
01867    {
01868       if(w->uri.len == wuri.len && strncmp(w->uri.s, wuri.s, wuri.len)== 0)
01869          return 1;
01870       w= w->next;
01871    }
01872 
01873    return 0;
01874 }
01875 
01876 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
01877 {
01878    watcher_t * w;
01879    db_key_t query_cols[3];
01880    db_val_t query_vals[3];
01881    db_key_t result_cols[2];
01882    db_res_t *result = NULL;
01883    db_row_t *row= NULL ;   
01884    db_val_t *row_vals;
01885    int n_result_cols = 0;
01886    int n_query_cols = 0;
01887    int wuser_col, wdomain_col;
01888    str wuser, wdomain, wuri;
01889    int i;
01890 
01891    /* select from watchers table the users that have subscribed
01892     * to the presentity and have status pending */
01893 
01894    query_cols[n_query_cols] = &str_presentity_uri_col;
01895    query_vals[n_query_cols].type = DB_STR;
01896    query_vals[n_query_cols].nul = 0;
01897    query_vals[n_query_cols].val.str_val = pres_uri;
01898    n_query_cols++;
01899 
01900    query_cols[n_query_cols] = &str_event_col;
01901    query_vals[n_query_cols].type = DB_STR;
01902    query_vals[n_query_cols].nul = 0;
01903    query_vals[n_query_cols].val.str_val = event;
01904    n_query_cols++;
01905 
01906    query_cols[n_query_cols] = &str_status_col;
01907    query_vals[n_query_cols].type = DB_INT;
01908    query_vals[n_query_cols].nul = 0;
01909    query_vals[n_query_cols].val.int_val = PENDING_STATUS;
01910    n_query_cols++;
01911 
01912    result_cols[wuser_col=n_result_cols++] = &str_watcher_username_col;
01913    result_cols[wdomain_col=n_result_cols++] = &str_watcher_domain_col;
01914    
01915    if (pa_dbf.use_table(pa_db, &watchers_table) < 0) 
01916    {
01917       LM_ERR("sql use table 'watchers_table' failed\n");
01918       return -1;
01919    }
01920 
01921    if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
01922        result_cols, n_query_cols, n_result_cols, 0, &result) < 0) 
01923    {
01924       LM_ERR("failed to query %.*s table\n",
01925             watchers_table.len, watchers_table.s);
01926       if(result)
01927          pa_dbf.free_result(pa_db, result);
01928       return -1;
01929    }
01930    
01931    if(result== NULL)
01932    {
01933       LM_ERR("mysql query failed - null result\n");
01934       return -1;
01935    }
01936 
01937    if (result->n<=0 )
01938    {
01939       LM_DBG("The query returned no result\n");
01940       pa_dbf.free_result(pa_db, result);
01941       return 0;
01942    }
01943 
01944    for(i=0; i< result->n; i++)
01945    {
01946       row = &result->rows[i];
01947       row_vals = ROW_VALUES(row);
01948 
01949       wuser.s = (char*)row_vals[wuser_col].val.string_val;
01950       wuser.len = strlen(wuser.s);
01951 
01952       wdomain.s = (char*)row_vals[wdomain_col].val.string_val;
01953       wdomain.len = strlen(wdomain.s);
01954 
01955       if(uandd_to_uri(wuser, wdomain, &wuri)<0)
01956       {
01957          LM_ERR("creating uri from username and domain\n");
01958          goto error;
01959       }
01960 
01961       if(watcher_found_in_list(watchers, wuri))
01962       {
01963          pkg_free(wuri.s);
01964          continue;
01965       }
01966       
01967       w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
01968       if(w== NULL)
01969       {
01970          ERR_MEM(PKG_MEM_STR);
01971       }
01972       memset(w, 0, sizeof(watcher_t));
01973 
01974       w->status= WAITING_STATUS;
01975       w->uri = wuri;
01976       w->id.s = (char*)pkg_malloc(w->uri.len*2 +1);
01977       if(w->id.s== NULL)
01978       {
01979          pkg_free(w->uri.s);
01980          pkg_free(w);
01981          ERR_MEM(PKG_MEM_STR);
01982       }
01983 
01984       to64frombits((unsigned char *)w->id.s,
01985          (const unsigned char*)w->uri.s, w->uri.len);
01986       w->id.len = strlen(w->id.s);
01987       w->event= event;
01988    
01989       w->next= watchers->next;
01990       watchers->next= w;
01991 
01992    }
01993 
01994    pa_dbf.free_result(pa_db, result);
01995    return 0;
01996 
01997 error:
01998    if(result)
01999       pa_dbf.free_result(pa_db, result);
02000    return -1;
02001 
02002 }
02003 

Generated on Thu May 24 08:00:51 2012 for Kamailio - The Open Source SIP Server by  doxygen 1.5.6