您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10494|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;! M; @4 n4 ~0 m
  2. require_once __DIR__ . '/Workerman/Autoloader.php';. M( W; K4 R4 l7 d. ?

  3. - r( O) F  ]1 X# C0 G0 o
  4. $worker = new Worker();# X$ A2 b/ i# ~2 R8 `( C: L
  5. // 4个进程
    + }0 m: w( O% {5 ~$ K9 |
  6. $worker->count = 4;
    9 d- B) d( V, j2 O& x6 f
  7. // 每个进程启动后在当前进程新增一个Worker监听
    1 z( c, k, p/ O0 U& ?4 g4 X
  8. $worker->onWorkerStart = function($worker)
    , J5 b4 w, g. T
  9. {
    / o6 b$ O  y2 B8 A4 Z8 ~3 Z
  10.     /**; X1 Z4 K1 \! ]% u# I. U
  11.      * 4个进程启动的时候都创建2016端口的Worker
      q% ^. ?" Q0 u" _# {
  12.      * 当执行到worker->listen()时会报Address already in use错误1 r, j" U5 b. r5 i3 {% J5 ^) v* U
  13.      * 如果worker->count=1则不会报错% v8 I$ x- H9 P4 q6 ]- w6 p
  14.      */
    # Y. s( R) u2 K6 g4 u+ |  [
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    9 u7 W! Y$ B  ~/ s& \
  16.     $inner_worker->onMessage = 'on_message';
    ( X1 X5 o0 W  i( _8 X
  17.     // 执行监听。这里会报Address already in use错误2 u2 }8 c$ W& w$ F6 T) d% x
  18.     $inner_worker->listen();
    : ^- D5 Q, Q% h7 q: N
  19. };
      C. X$ L1 s5 E5 ?; n

  20. 8 R- q7 q4 E. l: W: A% ]
  21. $worker->onMessage = 'on_message';* o/ {1 ^: t; |0 w5 P& L  R9 o

  22. 7 i$ e6 C+ R! H1 g9 |# a. k
  23. function on_message($connection, $data)
    - F# h7 O$ e4 O5 I3 n
  24. {$ k- ?, Y8 R0 @3 F
  25.     $connection->send("hello\n");
    1 S3 @! N6 Y' \+ k+ D$ G8 l) {
  26. }+ `0 d# W1 A0 g% f2 J
  27. ; T4 ]1 T2 h" B: G
  28. // 运行worker/ F  i8 e+ G. H$ @
  29. Worker::runAll();
    # t" p, o+ x' L$ X
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( h; }* E3 p$ G2 {! [4 L
  31. " C0 E* s  G, x
  32. use Workerman\Worker;
    - z2 `. L3 k3 x2 d
  33. require_once './Workerman/Autoloader.php';- k7 f. i7 e0 ?: N0 n

  34. : B" q" B. m( I0 B+ q: T
  35. $worker = new Worker('text://0.0.0.0:2015');
    * ~5 a. n4 k9 w. q4 ], E9 J" `. Q
  36. // 4个进程
    " ~' L, L" ^) N4 a7 H* y1 ]
  37. $worker->count = 4;' m7 M" }' f9 Z2 F
  38. // 每个进程启动后在当前进程新增一个Worker监听! n9 i  E7 t. l! Z/ p7 k
  39. $worker->onWorkerStart = function($worker)& @8 D- |4 h# A+ U1 e% ]# C
  40. {
    ) ?% h! \1 I" f! o4 O4 s( V1 Y' L
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');3 x2 Z# T: q, c! Z" {; k
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)7 d" ~" T  B; ^% T3 Z7 {
  43.     $inner_worker->reusePort = true;3 N  z- m  G# u9 }3 M; x9 }
  44.     $inner_worker->onMessage = 'on_message';& o# {- e7 |, a
  45.     // 执行监听。正常监听不会报错& |$ Z( H$ f  \' H  t
  46.     $inner_worker->listen();/ i6 |; t. {* C. e
  47. };; ^; L! Y9 t' R  ], U+ Q9 ~
  48. - Z& p9 C9 b+ ]# }3 I
  49. $worker->onMessage = 'on_message';# w* d- X6 t1 ]6 o' l/ H2 L9 M
  50. & n1 j$ _& f  R* S4 o. s$ d0 u) T/ y
  51. function on_message($connection, $data)
    . T0 N2 \6 |4 F: U. ^* w' i' T+ l  V
  52. {  W, v( |9 A$ Q, Q3 f& v' v
  53.     $connection->send("hello\n");
    $ k1 w9 I2 h" V. P# I7 }
  54. }+ s* R7 i8 l4 N- Q

  55. % K: D8 |4 \$ w" t3 h6 e4 v
  56. // 运行worker
    - i  I7 f# D. X! p" C
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php& x& a9 S: t4 i; Y/ ^0 m* C3 }) I
  2. use Workerman\Worker;/ `4 S7 l1 Q6 ~
  3. require_once './Workerman/Autoloader.php';) Z$ d2 z( f. l
  4. // 初始化一个worker容器,监听1234端口+ d# X6 R- x" ]1 M2 u
  5. $worker = new Worker('websocket://0.0.0.0:1234');# c$ v3 S0 U7 Q9 p2 {
  6. + `/ S% J- f8 L
  7. /*
    * c, b, T7 w/ b/ K5 M
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    : }2 }4 G2 }5 b& }. r# G! [; s) T
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ; G# u0 i/ F2 C5 `3 C- a
  10. */( u" X/ P5 Y- J8 g, h( ?
  11. $worker->count = 1;
    & `, h, Q8 @# t! a% _
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口1 w5 q# v5 M( x! M
  13. $worker->onWorkerStart = function($worker)7 z4 h8 k+ ~/ y0 b* k
  14. {) A& d% k7 X) m- R
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    - h3 n6 ]' B8 I6 ?' q! \
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    . L- Z( K' Y2 P
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    5 h8 K$ {! g. W% P$ S( Z3 O
  18.     {/ w* @7 l4 f# ]
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据; V% e1 J( k4 n$ E- _" `
  20.         $data = json_decode($buffer, true);" N/ M: Q8 N& b0 r3 d, M$ w0 s0 |
  21.         $uid = $data['uid'];0 a% e& [) W- F: y) D- k
  22.         // 通过workerman,向uid的页面推送数据
    & _5 [; g3 `: F. C
  23.         $ret = sendMessageByUid($uid, $buffer);. v1 e' A0 U( ~, F+ v
  24.         // 返回推送结果% M; n3 v- t8 w7 Q# v
  25.         $connection->send($ret ? 'ok' : 'fail');2 ^: z9 p3 ^5 Y- `0 B' a- l  p' ^, i
  26.     };1 g( N# b0 L5 W- ]+ {$ b
  27.     // ## 执行监听 ##+ D' B1 @" a; \. B( u0 T
  28.     $inner_text_worker->listen();8 j( L0 z7 C; B8 x. W) W6 W% j
  29. };
    , K1 F$ o, T) j/ Z. b
  30. // 新增加一个属性,用来保存uid到connection的映射5 y: o( y" k& ^4 z
  31. $worker->uidConnections = array();$ M- S" E, Z: j1 B  [
  32. // 当有客户端发来消息时执行的回调函数
    * P: U1 R2 _$ @7 j# H5 l
  33. $worker->onMessage = function($connection, $data)+ ?$ U! Y; }" P6 Y8 N' u# P8 a
  34. {; \$ h+ t6 F: `% c: x) {
  35.     global $worker;' f* h* o; ^; P5 K6 F
  36.     // 判断当前客户端是否已经验证,既是否设置了uid, |- Z- B) S# n- i8 H8 n
  37.     if(!isset($connection->uid))/ c( l7 S3 ^" t" y
  38.     {! l2 ?; x$ a# T8 b5 m! Z5 g
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& r  ~0 y2 ~$ i1 _
  40.        $connection->uid = $data;
    - R  x- {& |2 c0 D0 i; v5 V
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,' }+ R+ D! K% c+ m- X  \
  42.         * 实现针对特定uid推送数据
    / o% I5 M  p3 F
  43.         */9 A  L) _5 E% l" k# H( g" s7 p
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ' i3 A+ L8 P; }  w) {
  45.        return;# Y2 h" x( b9 k% m5 e- H% e, \6 k+ r
  46.     }5 o' l3 h! e2 j; k- U/ Y
  47. };
    ! c# M* y$ O& @8 E$ K

  48. 3 o4 ]( P" g, n  F
  49. // 当有客户端连接断开时) N, o/ F* K# X0 T% ^7 X
  50. $worker->onClose = function($connection)
    0 p3 ]" o6 @% A4 b
  51. {
    2 x$ d6 R' O( \; y
  52.     global $worker;
    0 A& o9 ^" t  O% M
  53.     if(isset($connection->uid))4 p& ^) O$ i/ j( [! R/ o- K0 x
  54.     {
    5 I! h7 Q, z8 B; V, F$ S/ c4 U9 u, G! O
  55.         // 连接断开时删除映射# F- \. e) m, n" j. a8 n8 H
  56.         unset($worker->uidConnections[$connection->uid]);6 s- U3 b% I/ b* _2 ?( B& ?
  57.     }& [# G4 L+ p( s9 h; i1 E/ c1 v9 I
  58. };
    4 }$ |# k5 F, P6 F4 t, Q8 ]

  59. ) R9 i$ ?+ }  U( f
  60. // 向所有验证的用户推送数据! |$ [9 s' E$ `: u) x9 X) @8 n
  61. function broadcast($message)
    - n) f, A* M/ b
  62. {6 r& N. w+ p1 X/ D4 E# C4 P
  63.    global $worker;
    " @2 L+ y8 L6 \/ M
  64.    foreach($worker->uidConnections as $connection)+ S& X: p4 G, m# `7 @; h4 M& o3 B
  65.    {: a  F0 @% c1 j+ d9 }) p1 z4 {
  66.         $connection->send($message);
    ) A" S) g: ]3 z( N6 F7 r) ?
  67.    }
    , j0 `7 R5 e8 f5 s7 j8 U' {
  68. }( k7 C: r4 h+ \2 _" D

  69. 6 t/ N3 u% c2 _( d& m
  70. // 针对uid推送数据1 Y1 r- I( _; [' `* @
  71. function sendMessageByUid($uid, $message)
    " S- c" X) {7 d, _* s
  72. {
    1 B  G0 T& u& Y( t
  73.     global $worker;. Y  S# C+ e1 n2 x: L. _: V
  74.     if(isset($worker->uidConnections[$uid]))2 A) n2 M! g4 t# ~" r: z1 f
  75.     {! Y7 U' P! ~8 M) b7 }* F! W
  76.         $connection = $worker->uidConnections[$uid];9 s; ~8 ?2 B; }3 P9 r  U, o  u% p
  77.         $connection->send($message);
    / Y0 q$ C1 g( P' }1 }
  78.         return true;% \8 z/ h, b( f5 M. o) O8 Z
  79.     }, s: ~7 W/ U# n+ b1 {
  80.     return false;
    9 y8 P3 O6 A9 f
  81. }6 v4 j/ F( {3 A. Q5 n
  82. , ~# z: \3 J0 X5 N: a
  83. // 运行所有的worker0 W) ?( J# T, ^2 _( l$ n
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');" o+ J7 q7 v: X
  2. ws.onopen = function(){
    * ]& r# Z1 g1 W$ W/ a, W# z- G
  3.     var uid = 'uid1';
    . [1 f4 S! Z1 ~1 j0 n+ d
  4.     ws.send(uid);
    6 v( \& A4 k& j7 Y4 q
  5. };
    ) _1 r: ?0 D1 w7 B0 O  T1 t
  6. ws.onmessage = function(e){
    7 d6 r" k& B& U
  7.     alert(e.data);
    * c4 T. x, j$ Q: f2 n  H; S" z3 Y
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    0 W" t" }6 @' [
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);5 S9 T/ n) |+ z
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    8 e$ C9 g! d( \" G2 x) ~; i; a+ \
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    % c: b! X8 U5 X, Q$ H& K4 T" j
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    : U5 L) d$ Z- A6 E
  6. fwrite($client, json_encode($data)."\n");
    ( I2 B9 ^/ P9 y( n7 b
  7. // 读取推送结果
    ; Y, V4 {/ U5 L6 r' \" i
  8. echo fread($client, 8192);
复制代码
  H+ f8 A- ^2 Z2 y

  L! y) S) ^! l) {% ?  K( O
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-17 19:21 , Processed in 0.164367 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!