You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

564 lines
17 KiB
Go

3 years ago
package icssessionmanager
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"strings"
"sync"
"time"
"gitlab.com/ics_cinnamon/voicegateway/icscbtimer"
"gitlab.com/ics_cinnamon/voicegateway/icsconf"
"gitlab.com/ics_cinnamon/voicegateway/icserror"
"gitlab.com/ics_cinnamon/voicegateway/icsevent"
"gitlab.com/ics_cinnamon/voicegateway/icslog"
"gitlab.com/ics_cinnamon/voicegateway/icsnet"
"gitlab.com/ics_cinnamon/voicegateway/icspacketparser"
"gitlab.com/ics_cinnamon/voicegateway/icssvc"
"gitlab.com/ics_cinnamon/voicegateway/recorddata"
)
type SessionManager struct {
evtSystem []*icsevent.EventSystem
sessions []*IcsSession
event *icsevent.EventH
//sip listener
SIPNeter icsnet.IcsNeter
//bot-command net
CommNet *icsnet.IcsTCPNet
}
var bigSession []*IcsSession
//var bigSession []*IcsSession
var onceSession sync.Once
var channelNum int
func getSessionInstance(SIPNeter *icsnet.IcsNeter) []*IcsSession {
//l := icslog.GetIcsLog()
onceSession.Do(func() {
bigSession = make([]*IcsSession, channelNum)
svc := icssvc.GetServiceStatus()
conf := svc.GetIcsConfig()
agentNum := len(conf.AgentConfig)
for iter := 0; iter < channelNum; iter++ {
bigSession[iter] = new(IcsSession)
bigSession[iter].m = &sync.Mutex{}
bigSession[iter].ID = iter
//bigSession[iter].goroutineID = goid()
bigSession[iter].isStart = false
//bigSession[iter].lastTimestamp = time.Now()
bigSession[iter].Cseq = 0
bigSession[iter].registerStatus = STATUS_REGISTER_NOT_READY
bigSession[iter].agentStatus = STATUS_AGENT_NOT_READY
bigSession[iter].AgentName = conf.AgentConfig[iter].Name
bigSession[iter].simLoopCount = 1
bigSession[iter].AgentName2 = fmt.Sprintf("agent%s", conf.AgentConfig[iter].Name)
if iter < agentNum {
if strings.Contains(strings.ToUpper(conf.AgentConfig[iter].Value), "TRUE") {
/*
//set rtp send callback
bigSession[iter].RTPPort = conf.AgentConfig[iter].MediaConfig.Port
bigSession[iter].RTPSenderCallBackTimer =
icscbtimer.NewCBTimer(
time.Millisecond*20,
bigSession[iter].SendRTPCB)
//set session's agentInfo conf
agentInfo := bigSession[iter].FindAgentInfo(bigSession[iter].AgentName)
if agentInfo != nil {
bigSession[iter].SetAgentInfo(agentInfo)
}
*/
} else {
bigSession[iter].RTPPort = -1
}
//set register callback timer
if strings.Contains(strings.ToUpper(conf.AgentConfig[iter].RegisterConfig.RegisterValue), "TRUE") {
//bigSession[iter].RegisterCallBackTimer = icscbtimer.NewCBTimer(time.Millisecond*100, bigSession[iter].RequestRegisterCB)
regiExpire := conf.AgentConfig[iter].RegisterConfig.RegisterExpire
bigSession[iter].RegisterCallBackTimer =
icscbtimer.NewCBTimer(
time.Millisecond*1000*time.Duration(regiExpire),
bigSession[iter].RequestRegisterCB)
} else {
bigSession[iter].registerStatus = STATUS_REGISTER_REGISTERED
bigSession[iter].agentStatus = STATUS_AGENT_READY
bigSession[iter].MethodAutomata = 32767
}
//set options callback timer
if strings.Contains(strings.ToUpper(conf.AgentConfig[iter].OptionsConfig.OptionsValue), "TRUE") {
interval := conf.AgentConfig[iter].OptionsConfig.OptionsInterval
bigSession[iter].OPTIONSCallBackTimer =
icscbtimer.NewCBTimer(
time.Millisecond*1000*time.Duration(interval),
bigSession[iter].RequestOptionsCB)
}
/*
//set invite callback timer
bigSession[iter].INVITECallBackTimer =
icscbtimer.NewCBTimer(
time.Millisecond*1000*time.Duration(1),
bigSession[iter].RequestInviteCB)
if err := bigSession[iter].Init(); err != nil {
l.Printf(icslog.LOG_LEVEL_FATAL, -1, "Session Init error. Voice Agent EXIT!! - %s", err.GetMessage())
os.Exit(0)
}
*/
}
bigSession[iter].eventSystem = icsevent.GetMyEventSystem(iter)
//bigSession[iter].Init(iter)
//set sip connection to each sessions
bigSession[iter].sipNeter = SIPNeter
}
})
return bigSession
}
//TODO: make this function to singleton
func NewSessionManager() *SessionManager {
l := icslog.GetIcsLog()
conf := icsconf.GetIcsConfig()
svc := icssvc.NewService()
channelNum = svc.GetIcsConfig().GetChannelNum()
//icsevent.SetConfig(svc.GetIcsConfig())
sm := new(SessionManager)
icsevent.SetChannelNum(channelNum)
sm.evtSystem = icsevent.GetEvtSystemInstance()
//start sip listen
sipPort := conf.SIPConfig.Port
transport := conf.SIPConfig.Transport
sipProxy := conf.SIPConfig.SIPProxy
remoteaddr := icsnet.NewNetAddrWithIPAddr(sipProxy)
localAddrStr := fmt.Sprintf("0.0.0.0:%d", sipPort)
localAddr := icsnet.NewNetAddrWithIPAddr(localAddrStr)
switch strings.ToUpper(transport) {
case "UDP":
sm.SIPNeter = icsnet.NewUDP(&localAddr, &remoteaddr)
case "TCP":
sm.SIPNeter = icsnet.NewTCP(&localAddr, &remoteaddr)
default:
l.Print(icslog.LOG_LEVEL_FATAL, -1, "No compatible transport. Check the configuration")
return nil
}
/*
cerr := sm.SIPNeter.Connect()
if cerr != nil {
l.Print(icslog.LOG_LEVEL_FATAL, -1, cerr.GetMessage())
return nil
}
*/
lerr := sm.SIPNeter.Listen()
if lerr != nil {
l.Printf(icslog.LOG_LEVEL_FATAL, -1, "Listening failed: %s", lerr.GetError())
return nil
}
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Listening SIP %s port[%d]", transport, sipPort)
/*
//start bot-command listen
commandValue := strings.ToUpper(conf.CommandConfig.Value)
commandPort := conf.CommandConfig.Port
commandTransport := conf.CommandConfig.Transport
if strings.Compare("TRUE", commandValue) == 0 {
commandLocalAddrStr := fmt.Sprintf("0.0.0.0:%d", commandPort)
commandLocalAddr := icsnet.NewNetAddrWithIPAddr(commandLocalAddrStr)
switch strings.ToUpper(commandTransport) {
case "UDP":
sm.CommNeter = icsnet.NewUDP(&commandLocalAddr, nil)
case "TCP":
sm.CommNeter = icsnet.NewTCP(&commandLocalAddr, nil)
default:
l.Print(icslog.LOG_LEVEL_FATAL, -1, "Bot-Command listening error. No compatible transport. Check the configuration")
return nil
}
lerr = sm.CommNeter.Listen()
if lerr != nil {
l.Printf(icslog.LOG_LEVEL_FATAL, -1, "%s", lerr.GetError())
return nil
}
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Listening Bot-Command %s port[%d]", commandTransport, commandPort)
}
*/
sm.sessions = getSessionInstance(&sm.SIPNeter)
//EventH. Create Event Array
sm.event = icsevent.NewEventH()
return sm
}
func (sm *SessionManager) Load() {
sm.event.Init()
//for iter, session := range sm.sessions {
for _, session := range sm.sessions {
//l.Printf(icslog.LOG_LEVEL_DEBUG2, -1, "333 %d", iter)
go session.Run()
}
}
func (sm *SessionManager) Close() {
for iter := 0; iter < channelNum; iter++ {
sm.sessions[iter].RemoveSession()
}
sm.SIPNeter.Close()
}
func (sm *SessionManager) Run() (icserr *icserror.IcsError) {
l := icslog.GetIcsLog()
conf := icsconf.GetIcsConfig()
svc := icssvc.GetServiceStatus()
for !svc.GetExit() || !svc.GetStop() {
data, addr, len, rerr := sm.SIPNeter.ReadSIP()
if rerr != nil {
return rerr
}
//fmt.Println("RECVED>>>", addr, len, string(data))
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Recved Data[%d] [%s]->[%s]>\n%s",
len,
addr.String(),
sm.SIPNeter.LocalAddr().String(),
string(data[:len]))
sip := icspacketparser.NewSIP()
sip.SipParser(data)
if icspacketparser.ICS_SIP_METHOD_NOT_FOUND == sip.Method { //not found sip. maybe abnormal use
l.Printf(icslog.LOG_LEVEL_INFO, -1, "icspacketparser.ICS_SIP_METHOD_NOT_FOUND-%s", string(data))
continue
} else {
if SessionAvailableSipMethod(&sip) {
l.Printf(icslog.LOG_LEVEL_DEBUG, -1, "Session Processing Method-[%s, %s, %s]",
sip.Method, sip.ResType, sip.Cseq)
s, serr := FindSession(sip)
if serr != nil { //not found session, create new
name1 := strings.SplitN(sip.To, "@", 2)
name2 := strings.SplitN(name1[0], ":", 2)
agentname := name2[1]
s, serr = AllocSession(agentname, sip.GetCallID())
if serr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, -1, "Licensed Session number[%d:%s] is EXCEEDED-%s",
conf.GetChannelNum(), agentname, serr.GetMessage())
continue
}
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Session Allocated. Session ID[%d] Call ID[%s]", s.ID, sip.GetCallID())
s.SetSessionMethod(sip)
//session start
s.Start()
} else { //found session
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Session found [%s][%s]", sip.GetCallID(), s.callID)
if sip.Method == icspacketparser.ICS_SIP_METHOD_INVITE {
s.SetCallID(sip.GetCallID())
}
if s.CheckAutomata(&sip) {
s.SetSessionMethod(sip)
} else {
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Not Allowed Method(%d)", s.MethodAutomata)
//TODO : response 400 error
continue
}
//if cancel ACK or bye res, remove the session
if s.MethodAutomata == ICS_SIP_AUTOMATA_CANCEL && sip.Method == icspacketparser.ICS_SIP_METHOD_ACK {
s.Stop()
serr := s.RemoveSession()
if serr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "RemoveSession Error %s", serr.GetMessage())
} else {
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Session Removed")
}
continue
}
}
//post the sip event to session
h := icsevent.NewEventH()
evt, evtErr := h.AllocEvent(sip)
if evtErr != nil {
return evtErr
}
perr := h.PostEvent(s.ID, evt)
//perr := h.PostEvent(int(s.GetSessionID()), evt)
if perr == nil {
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Post SIP event[%d] to Session[%03d]", evt.ID, s.ID)
} else {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "Failed to post SIP event[%d] to Session[%03d] - %s",
evt.ID, s.ID, perr.GetError())
}
} else { // no session
switch sip.Method {
case icspacketparser.ICS_SIP_METHOD_OPTIONS:
l.Printf(icslog.LOG_LEVEL_INFO, -1, "%v", sip)
case icspacketparser.ICS_SIP_METHOD_SIP20:
if strings.Contains(strings.ToUpper(sip.Cseq), "OPTIONS") {
//l.Printf(icslog.LOG_LEVEL_INFO, -1, "%v", sip)
} else {
l.Printf(icslog.LOG_LEVEL_INFO, -1, "%v", sip)
}
default:
l.Printf(icslog.LOG_LEVEL_INFO, -1, "%v", sip)
}
}
}
data = nil
}
return nil
}
//bot-command manager
func (sm *SessionManager) RunBotCommandMNG() (icserr *icserror.IcsError) {
l := icslog.GetIcsLog()
conf := icsconf.GetIcsConfig()
//start bot-command listen
commandPort := conf.CommandConfig.Port
commandTransport := conf.CommandConfig.Transport
commandLocalAddrStr := fmt.Sprintf("0.0.0.0:%d", commandPort)
commandLocalAddr := icsnet.NewNetAddrWithIPAddr(commandLocalAddrStr)
var cmdErr *icserror.IcsError
sm.CommNet, cmdErr = icsnet.ListenAndServeTCP(&commandLocalAddr, nil, "\r\n\r\n", BotCommand)
if cmdErr != nil {
l.Printf(icslog.LOG_LEVEL_FATAL, -1, "%s", cmdErr.GetError())
return nil
}
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Listening Bot-Command %s port[%d]", commandTransport, commandPort)
/*
for !svc.GetExit() || !svc.GetStop() {
}
*/
return nil
}
func BotCommand(t *icsnet.IcsTCPNet, bufend string) {
l := icslog.GetIcsLog()
defer t.Close()
for {
ttsHeader, rlen, rerr := t.ReadS(86, bufend)
if rerr != nil {
if rerr.GetError() != io.EOF {
l.Printf(icslog.LOG_LEVEL_ERROR, -1, "[Bot Command] ReadS Error! - %s[%d:%s]",
rerr.GetError(), rlen, len(ttsHeader))
}
break
} else {
l.Printf(icslog.LOG_LEVEL_INFO, -1, "Recved Bot Command(%s)", ttsHeader)
ttscmd := binary.LittleEndian.Uint32(ttsHeader[0:])
ttsrc := binary.LittleEndian.Uint32(ttsHeader[4:])
ttspl := binary.LittleEndian.Uint64(ttsHeader[8:])
agentname := string(ttsHeader[16:])
//remove null terminator
n := bytes.Index([]byte(agentname), []byte{0})
agentName := string([]byte(agentname)[:n])
telno := string(ttsHeader[36:])
m := bytes.Index([]byte(telno), []byte{0})
telNo := string([]byte(telno)[:m])
s := findSessionWithAgentName2(agentName)
if s == nil {
l.Printf(icslog.LOG_LEVEL_ERROR, -1, "Not found session - %s", agentName)
return
}
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Bot Command(%d) - [%s:%d]", rlen, agentName, ttscmd)
switch ttscmd {
case recorddata.TTS_COMMAND:
l.Print(icslog.LOG_LEVEL_INFO, s.ID, "Recved TTS command")
//fmt.Println("374LINE", ttscmd, ttsrc, ttspl, agentName)
//recv tts data
tts, rlen, rerr := t.Read(int(ttspl))
if rerr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "Failed to recv TTS(%d,%s)", rlen, rerr.GetError())
return
}
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Recved TTS data. Length: %d", len(tts))
//////////////////////
//RTP start
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Started RTP Callback timer %+v", s.RTPSenderCallBackTimer)
s.RTPSenderCallBackTimer.Stop()
s.m.Lock()
s.tts = make([]byte, int(ttspl))
copy(s.tts, tts)
s.m.Unlock()
/*
n := time.Now()
filename := fmt.Sprintf("./%daaabbb%d-2.pcm", s.ID, n.Nanosecond())
ttsfile, ferr := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND|os.O_TRUNC|os.O_SYNC, os.FileMode(0644))
if ferr != nil {
fmt.Println(ferr)
os.Exit(1)
}
ttsfile.Write(tts)
ttsfile.Close()
*/
//s.tts = tts
//s.txCnt = 0
//s.callStartTime = time.Now()
s.RTPSenderCallBackTimer.Start()
case recorddata.BYE_COMMAND:
l.Print(icslog.LOG_LEVEL_INFO, s.ID, "Recved BYE command")
//fmt.Println("393LINE", ttscmd, ttsrc, ttspl, agentName)
if int(ttspl) > 0 {
//recv tts data
tts, rlen, rerr := t.Read(int(ttspl))
if rerr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "Failed to recv TTS(%d,%s)", rlen, rerr.GetError())
return
}
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Read T: %v", t)
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Recved TTS data. Length: %d", len(tts))
//////////////////////
//RTP start
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Started RTP Callback timer %+v", s.RTPSenderCallBackTimer)
s.RTPSenderCallBackTimer.Stop()
s.m.Lock()
s.tts = make([]byte, int(ttspl))
copy(s.tts, tts)
//s.tts = tts
s.m.Unlock()
//s.txCnt = 0
//s.callStartTime = time.Now()
s.RTPSenderCallBackTimer.Start()
//request bye
sleeptime := int(ttspl) / 16
time.Sleep(time.Millisecond * time.Duration(sleeptime+200))
agentStatus := s.GetAgentStatus()
//l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Agent Status: %d", agentStatus)
if agentStatus == STATUS_AGENT_BUSY {
s.RequestBYE(s.InviteSIP)
s.SetAgentStatus(STATUS_AGENT_BYEING)
fmt.Println("AGENT_STATUS : ", s.GetAgentStatus())
}
s.BotStatus = ttscmd
} else {
if s.InviteSIP == nil {
agentStatus := s.GetAgentStatus()
if agentStatus == STATUS_AGENT_BUSY {
s.SetAgentStatus(STATUS_AGENT_ERROR)
}
} else {
s.RequestBYE(s.InviteSIP)
}
}
case recorddata.DTMF_COMMAND:
l.Print(icslog.LOG_LEVEL_INFO, s.ID, "Recved DTMF command")
//fmt.Println("374LINE", ttscmd, ttsrc, ttspl, agentName)
//recv tts data
tts, rlen, rerr := t.Read(int(ttspl))
if rerr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "Failed to recv TTS(%d,%s)", rlen, rerr.GetError())
fmt.Println("error : ", rerr.GetError())
return
}
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Recved TTS data. Length: %d", len(tts))
//////////////////////
//RTP start
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Started RTP Callback timer %+v", s.RTPSenderCallBackTimer)
s.RTPSenderCallBackTimer.Stop()
s.m.Lock()
s.tts = make([]byte, int(ttspl))
copy(s.tts, tts)
s.m.Unlock()
s.BotStatus = ttscmd
s.RTPSenderCallBackTimer.Start()
case recorddata.REFER_COMMAND:
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Recved REFER command %+v", ttscmd)
//fmt.Println("393LINE", ttscmd, ttsrc, ttspl, agentName)
if int(ttspl) > 0 {
//recv tts data
tts, rlen, rerr := t.Read(int(ttspl))
if rerr != nil {
l.Printf(icslog.LOG_LEVEL_ERROR, s.ID, "Failed to recv TTS(%d,%s)", rlen, rerr.GetError())
return
}
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Recved TTS data. Length: %d", len(tts))
//////////////////////
//RTP start
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Started RTP Callback timer %+v", s.RTPSenderCallBackTimer)
s.RTPSenderCallBackTimer.Stop()
s.m.Lock()
s.tts = make([]byte, int(ttspl))
copy(s.tts, tts)
//s.tts = tts
s.m.Unlock()
//s.txCnt = 0
//s.callStartTime = time.Now()
s.RTPSenderCallBackTimer.Start()
//request bye
sleeptime := int(ttspl) / 16
time.Sleep(time.Millisecond * time.Duration(sleeptime+200))
agentStatus := s.GetAgentStatus()
//l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "Agent Status: %d", agentStatus)
if agentStatus == STATUS_AGENT_BUSY {
s.RequestRefer(telNo, s.InviteSIP)
s.SetAgentStatus(STATUS_AGENT_BYEING)
fmt.Println("AGENT_STATUS : ", s.GetAgentStatus())
}
s.BotStatus = ttscmd
} else {
if s.InviteSIP == nil {
agentStatus := s.GetAgentStatus()
if agentStatus == STATUS_AGENT_BUSY {
s.SetAgentStatus(STATUS_AGENT_ERROR)
}
} else {
s.RequestRefer(telNo, s.InviteSIP)
}
}
default:
l.Printf(icslog.LOG_LEVEL_INFO, s.ID, "485LINE %+v, %+v, %+v, %s", ttscmd, ttsrc, ttspl, agentName)
}
}
fmt.Println("418LINE rlen", rlen, rerr)
}
}