您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14663|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;8 I  A! ?* r) W! l9 }
  2. require_once __DIR__ . '/Workerman/Autoloader.php';, G- k7 o0 G/ e7 p' f

  3. 9 j" V4 v& {3 v* l- I! q
  4. $worker = new Worker();5 S+ X& f' \8 j7 q& p, `  t) H
  5. // 4个进程; H$ o" E, z+ {1 i" A0 R  `
  6. $worker->count = 4;+ \2 W. T6 z  @4 Q- V
  7. // 每个进程启动后在当前进程新增一个Worker监听
    1 C7 X' D- K3 t3 ?7 v
  8. $worker->onWorkerStart = function($worker), I( z* V4 w: O
  9. {/ s; b4 i1 w: @& N
  10.     /**
    2 B8 r- `3 {- l* z( y
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ' v2 u2 Y9 S6 w3 }; b
  12.      * 当执行到worker->listen()时会报Address already in use错误2 V, J4 g: _! K+ K+ ~5 _
  13.      * 如果worker->count=1则不会报错
    * ^: k; S. R7 C% ~6 H( ]
  14.      */) i$ t* r4 A4 m! t$ Y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');' S  ?: P2 {1 u1 a: y( ]# V) z
  16.     $inner_worker->onMessage = 'on_message';) h& q: {6 c% |* F
  17.     // 执行监听。这里会报Address already in use错误% N9 v2 [  @- t" p! I: {# }, e
  18.     $inner_worker->listen();- f' O+ ^3 p; Z" a" R" [
  19. };0 d+ E- M. V+ o* e

  20. 5 [$ r' p* w0 O4 w* f
  21. $worker->onMessage = 'on_message';- R4 L* x0 ^' Q8 C: z/ Z
  22. & K7 c- x% ~" r; K, O) R
  23. function on_message($connection, $data)/ a; Y% _& K, Z, b+ T0 t) F$ I
  24. {/ y8 l1 h$ W! z4 U
  25.     $connection->send("hello\n");
    ' ?5 ?" ]) I3 Z% ^& E1 r" K4 d
  26. }7 A# x' C! [8 o, r; o' Y$ G& m
  27. 4 x0 [: B9 v/ w/ @' |; D
  28. // 运行worker1 L, @: \4 o+ j, L* D, A( S
  29. Worker::runAll();
    9 |* _3 n) q0 E6 D) z
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:# }* t  B$ A: a( |

  31. , E. J8 {: E  x& P
  32. use Workerman\Worker;
    1 N& ~8 u2 ^6 T* r$ _- W
  33. require_once './Workerman/Autoloader.php';; A/ d2 u) E. q
  34. 8 s; p: z* I/ ]
  35. $worker = new Worker('text://0.0.0.0:2015');/ K' e! ]% C' f7 u* F! R3 D# d+ M
  36. // 4个进程$ s0 ~' U+ d' l! e4 t% B' q! G
  37. $worker->count = 4;
    - C8 x' B& O' i4 [- p; F) T4 A
  38. // 每个进程启动后在当前进程新增一个Worker监听2 R' r% }2 e+ }( m( m7 a! C
  39. $worker->onWorkerStart = function($worker): o5 D- j' z1 Z8 m+ i& H0 k: W# N
  40. {1 Y% H, u: a. p, \1 v1 t4 K* h- ^
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    5 Z. Y5 q! U: j- n
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    * R( {0 n" ]+ I" b# R" H
  43.     $inner_worker->reusePort = true;
    5 l8 P8 L5 O3 l. o5 Z
  44.     $inner_worker->onMessage = 'on_message';3 R4 \5 X* c+ T8 Z9 k
  45.     // 执行监听。正常监听不会报错% X0 D6 ~( e- d
  46.     $inner_worker->listen();, C$ n6 ]' q6 ~4 D$ A; n
  47. };( G; h2 F: C& e( e% g
  48. 2 W! ?2 o" o5 [5 O" K+ q4 S2 B$ m
  49. $worker->onMessage = 'on_message';
    ! M$ O8 c7 L1 \, @) K

  50. " k$ N! v9 S4 S9 i% S4 X8 r) T
  51. function on_message($connection, $data)
      }" b& G6 x6 @
  52. {
    $ C+ ?, r9 e2 f# A
  53.     $connection->send("hello\n");5 \' F; H6 I0 t5 ?* w" }
  54. }5 _( S/ S: [" ~! e9 s# l: g" G! [

  55.   w9 g0 R% f& o  ?. X
  56. // 运行worker
    6 Y. t- S0 L' _' j# |. h5 z% S% m
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    0 j% Y* x: I9 r6 m3 c1 R
  2. use Workerman\Worker;: p9 w8 X& {# Z
  3. require_once './Workerman/Autoloader.php';
    ) f" u3 J) ~0 l1 s. w, h( G
  4. // 初始化一个worker容器,监听1234端口
    / q- P/ c  [. J( G2 i# c/ P* ^
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ) t3 `8 h0 }: d4 N& ^

  6. 0 k" s* z/ \3 Y& `$ x
  7. /*
    ) e. e  Y- f+ E+ X8 t
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误; K; t8 C. k& w1 z; j  |7 l
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
      B' a, w' h# T
  10. */+ A! h1 V' {$ w$ r" \# R
  11. $worker->count = 1;5 `9 G2 Q9 I3 [8 i% a- G" U' n5 x* j! o
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口. U9 @6 w% q3 E# E
  13. $worker->onWorkerStart = function($worker)6 d, A; Z* W  {) h
  14. {
    ) V1 w3 x, [5 E4 n. u. I
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ; K3 e- P; Z# |! ^
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');8 y" V' i( Z3 y5 o& ~* v
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    8 M) v( t" F. e$ _$ A6 l/ I. G
  18.     {
    : d* O6 G2 e" H% n$ v+ B1 d
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    : x5 H9 z' S& z, F% t
  20.         $data = json_decode($buffer, true);+ A4 w8 b$ e# ~+ k
  21.         $uid = $data['uid'];* L6 v) q6 V( z
  22.         // 通过workerman,向uid的页面推送数据4 [& `. w2 i; G' e2 B3 b
  23.         $ret = sendMessageByUid($uid, $buffer);
    & ~$ U) z; Q3 @
  24.         // 返回推送结果4 T/ E( L4 J) y5 O
  25.         $connection->send($ret ? 'ok' : 'fail');- K; B# z5 ^) i' ]+ \
  26.     };
    + u! z0 f9 J! [% k$ K
  27.     // ## 执行监听 ##
    / i1 I  R. D& R3 E, l1 W
  28.     $inner_text_worker->listen();
    # r& g! V0 M; \1 o4 o
  29. };  m9 y! f, v- j# M: }. H2 R& q1 `
  30. // 新增加一个属性,用来保存uid到connection的映射3 ?) o& Y( W* c! s
  31. $worker->uidConnections = array();
    " b# n" ?) v1 K9 f7 |" P
  32. // 当有客户端发来消息时执行的回调函数
    5 G; d' [. G# G# J$ f" e
  33. $worker->onMessage = function($connection, $data)
    # o) ^3 a" y( o; U
  34. {) {3 }/ q0 u2 {1 [4 _3 x+ d
  35.     global $worker;
    , L( y! @3 x* L9 ]: `
  36.     // 判断当前客户端是否已经验证,既是否设置了uid* W( h8 V+ l, Y+ _. X
  37.     if(!isset($connection->uid))% Q9 b2 ?% T6 U  I  Y
  38.     {
    $ b3 W9 }; Q4 q7 ~
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)' i7 ]- V" j4 J- L
  40.        $connection->uid = $data;( _0 C- p# x# @' D: S: E
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ( Z/ U0 w; u0 R" z7 z
  42.         * 实现针对特定uid推送数据/ s0 [0 J/ m$ Q  ^/ D
  43.         */) ^  v, \4 w( H! F
  44.        $worker->uidConnections[$connection->uid] = $connection;
    7 b! K" V! U5 j( W" F
  45.        return;! L8 d# s0 O% ~3 N
  46.     }3 x, ]9 H4 W1 b8 l; F+ u
  47. };
    ! W- _/ ]+ O4 z8 d* C) i
  48. ( r0 R1 W7 h8 G* F2 o, e, O8 Q
  49. // 当有客户端连接断开时6 u" N! @% r8 N; Y" N: b
  50. $worker->onClose = function($connection)/ H' J/ @) k7 J3 N5 d; ]  A
  51. {
    # R# f) \4 P( T, v4 e( r7 e7 v' Q
  52.     global $worker;
    5 a: |" J2 J* D) T2 [
  53.     if(isset($connection->uid))" w( f! y+ @5 \/ z2 E
  54.     {: u: X- z1 I& n; g! i4 O0 J
  55.         // 连接断开时删除映射
    & A3 g; ~: j/ J% o6 g
  56.         unset($worker->uidConnections[$connection->uid]);9 ^5 a6 I, l9 k( p
  57.     }$ C& I# W1 w. _0 [% L! J
  58. };) M' W1 {: g3 f  ]5 a8 f

  59. / T/ H" T1 [& R; u& x  s0 b' f
  60. // 向所有验证的用户推送数据
      d6 q  G- f' t/ t! ^
  61. function broadcast($message)% o, f" k0 S, i0 z" X! q4 W
  62. {& j( s; h3 y9 W! o
  63.    global $worker;  R" g3 j" q+ ?# T  g; e0 f- @
  64.    foreach($worker->uidConnections as $connection)
    " g- a! w4 p$ }6 {1 o
  65.    {# @2 X& ~& U2 M! N; N
  66.         $connection->send($message);4 J5 W! O% ~# B# p  @7 U
  67.    }* {' }9 z1 s4 B' g
  68. }
    3 f* u# T/ s* i) c2 n
  69. 8 N! W) _! l' U! o( X$ D
  70. // 针对uid推送数据
    7 ~3 l) m. N( [* v; j) \7 B+ V( q
  71. function sendMessageByUid($uid, $message)
    ' N7 ~5 c: c; F/ L8 ]  F) t9 {. z
  72. {
    ' a7 K* Z8 T5 {' z0 o. c3 `- N
  73.     global $worker;
    ; W% K) p) l# L9 I
  74.     if(isset($worker->uidConnections[$uid]))
    , M6 ^, t1 N/ L& q! a2 S
  75.     {. @" v$ X+ l/ f8 @3 e; B
  76.         $connection = $worker->uidConnections[$uid];9 O- r8 Q: L9 }/ d8 Z8 d
  77.         $connection->send($message);& M/ C1 h' @8 d" M4 w  [/ W! y, g
  78.         return true;
    ! m" r6 z( \7 K  S1 c; j& \/ E8 A$ g
  79.     }6 y  P2 {" |9 F  F
  80.     return false;
      I) q) m9 ]: F. |$ c" A
  81. }: _  @0 H+ D3 z. l3 j7 r3 S

  82. * l8 y# K6 t$ m* T
  83. // 运行所有的worker
    / P2 _% Y+ V+ T4 }1 ?
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');1 W' v9 i# l, u
  2. ws.onopen = function(){6 @* K7 W8 c# {
  3.     var uid = 'uid1';
    " L1 W$ L1 c1 |6 {) ?
  4.     ws.send(uid);" d! W1 F7 @$ n5 y  ]) X
  5. };
    9 x6 d. ~' s% O" p/ \3 q: g3 Z* p
  6. ws.onmessage = function(e){* t( ]" J2 j# x
  7.     alert(e.data);5 @* H+ @4 q% Q  k8 m
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    ' b( t- k8 P$ k- t( w7 }
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);6 ]+ F' m8 S# Q  I' S# P
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    ! z6 H" B7 W; V4 f' o( L/ L6 H4 i
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');# w" X& r4 r- V4 Z
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符0 E0 a. T% J) {. J4 D
  6. fwrite($client, json_encode($data)."\n");
    - s, u& K6 V3 {7 X% g
  7. // 读取推送结果. y6 _, E7 a  L% p2 [5 p+ K& y
  8. echo fread($client, 8192);
复制代码

; ]' |: d/ m. ?: H8 u; f3 o4 o: O$ [$ s
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 15:04 , Processed in 0.059017 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!